[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411866242



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411866042



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   I changed it to convert to millis since nanoseconds aren't very readable 
and this is closer to the existing behavior.

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411864699



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Good point. I'll remove that. I don't think we use it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411864205



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)

Review comment:
   Any idea how to efficiently handle the overflow here? I think it's very 
unlikely we'll ever hit it given our normal delays.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411863881



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   Thanks, I actually forgot the unit conversion here. I was thinking it'd 
still be nicer to print it in ms. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411863591



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   Thanks, that was a lazy code conversion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411858555



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1610,79 +1610,6 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 )));
 }
 
-@Test
-public void 
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {

Review comment:
   I don't mean to totally cop out on this, but I think we should do this 
in a followup PR. I'll make a ticket and assign it to myself for later so I 
can't escape, but I don't even think it's worth marking it `@Ignore` for now.
   Tbh we should have removed it a while ago, rather than changing it over time 
to become its useless self today. It's a long history, and I'm mostly 
responsible, but just looking ahead the question now is: what do we even want 
to validate? The task assignor has no knowledge of version probing, and the 
partition assignor is not responsible for the task assignment (whereas it used 
to be with version probing, hence this test). What we should do is validate the 
inputs are being assembled sensibly during version probing.
   Anyways this will be really difficult to do just based on the final 
partition assignment, and even harder to distinguish a real failure from an 
unrelated one. So I'd propose to kick this into the future, when we embed the 
actual assignor class in the configs instead of this flag, and then pass in a 
`VersionProbingClientStatesValidatingAssignor` or whatever...SG?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411858808



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1610,79 +1610,6 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 )));
 }
 
-@Test
-public void 
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {

Review comment:
   Probably a much longer answer than you ever wanted, but this test has 
been haunting me over many PRs  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2020-04-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088262#comment-17088262
 ] 

Matthias J. Sax commented on KAFKA-9013:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5891/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:02 PM 

[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411847662



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)

Review comment:
   Should we handle the overflow?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2020-04-20 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9897:
--

 Summary: Flaky Test 
StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
 Key: KAFKA-9897
 URL: https://issues.apache.org/jira/browse/KAFKA-9897
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 2.6.0
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
{quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
state store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
RUNNING at 
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
 at 
org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
 at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9896) Flaky Test StandbyTaskEOSIntegrationTest#surviveWithOneTaskAsStandby[exactly_once_beta]

2020-04-20 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9896:
--

 Summary: Flaky Test 
StandbyTaskEOSIntegrationTest#surviveWithOneTaskAsStandby[exactly_once_beta]
 Key: KAFKA-9896
 URL: https://issues.apache.org/jira/browse/KAFKA-9896
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Affects Versions: 2.6.0
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1872/testReport/junit/org.apache.kafka.streams.integration/StandbyTaskEOSIntegrationTest/surviveWithOneTaskAsStandby_exactly_once_beta_/]
{quote}java.lang.AssertionError at org.junit.Assert.fail(Assert.java:87) at 
org.junit.Assert.assertTrue(Assert.java:42) at 
org.junit.Assert.assertTrue(Assert.java:53) at 
org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.surviveWithOneTaskAsStandby(StandbyTaskEOSIntegrationTest.java:113){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411846617



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   typo: delayMs -> delayNs





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411845809



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Is it necessary to accept Delayed type now?

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   How about dueNs > time.nanoseconds ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-616934105


   Java 8: 
`org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.surviveWithOneTaskAsStandby[exactly_once_beta]`
   Java 11: 
`org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication`
   Java 14: 
`org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores`
   
   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411841909



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   

[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411840027



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   @ijuma I have removed getDelay and we no longer implement Delayed. In 
doing so I also switched the fetcher to a monotonic clock, as our existing 
implementation is dangerous.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411828565



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411828397



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r41182



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Yeah I 

[jira] [Commented] (KAFKA-9868) Flaky Test EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore

2020-04-20 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088233#comment-17088233
 ] 

Boyang Chen commented on KAFKA-9868:


[https://github.com/apache/kafka/pull/8522]

> Flaky Test 
> EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
> ---
>
> Key: KAFKA-9868
> URL: https://issues.apache.org/jira/browse/KAFKA-9868
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Sophie Blee-Goldman
>Assignee: Boyang Chen
>Priority: Major
>  Labels: flaky-test
>
> h3. Error Message
> java.lang.AssertionError: Condition not met within timeout 15000. Expected 
> ERROR state but driver is on RUNNING
>  
> From what we observed, in unit test with transaction turned on, it takes a 
> long time to bootstrap the test as well as sometimes getting too many open 
> files for system test. To reduce the start time and make tests less flaky, we 
> should set the number of txn log partitions to a much smaller number than 50.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616917436


   Got 2/3 green, one failed test is due to
   ```
   kafka.api.SaslSslConsumerTest.testCoordinatorFailover
   
   java.lang.AssertionError: expected: but 
was:
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411826785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411826782



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Yep, 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411825756



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: avoid unnecessary delay conversion in isDelayed check

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411821681



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   Sure thing. I'll drop it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411818919



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411818806



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[jira] [Created] (KAFKA-9895) Truncation request on broker start up may cause OffsetOutOfRangeException

2020-04-20 Thread Boquan Tang (Jira)
Boquan Tang created KAFKA-9895:
--

 Summary: Truncation request on broker start up may cause 
OffsetOutOfRangeException
 Key: KAFKA-9895
 URL: https://issues.apache.org/jira/browse/KAFKA-9895
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Boquan Tang


We have a 4 broker cluster running version 2.4.0.
Upon broker restart, we frequently observe issue like this:
{code}
[2020-04-20 20:36:37,827] ERROR [ReplicaFetcher replicaId=4, leaderId=1, 
fetcherId=0] Unexpected error occurred during truncation for topic-name-10 at 
offset 632111354 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request for 
offset 632111355 for partition active-ads-10, but we only have log segments in 
the range 0 to 632111354.
{code}

The partition experiencing this issue seems random. Could we actually ignore 
this kind of error and not put this partition to offline? From what the error 
log describes, I think once the start up finishes, and the partition catches up 
with leader, it should be OK to put it back to ISR. Please help me if I'm 
understanding it incorrectly.

This happens after we updated to 2.4.0, so I'm wondering if it has anything to 
do with this specific version or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411815724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   I 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411815738



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411814466



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813832



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813351



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813053



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   I might 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411812127



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411810700



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088201#comment-17088201
 ] 

John Roesler commented on KAFKA-9224:
-

Thanks [~mjsax] ,

This is a really good point. Actually, we face the same challenge with ALOS. 
For some reason, we decided just to shrug it off and let state under ALOS 
become corrupted during crash recovery. I've never been convinced this makes 
sense, and I secretly consider it a bug to be fixed someday. So adding a 
"pending write buffer" would be an equally good idea under ALOS, and it would 
also let us roll back the store to the committed offset for correct 
re-processing. Incidentally, then, the "query_committed|query_uncommitted" IQ 
option would also be applicable under ALOS.

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-20 Thread Jesse Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088199#comment-17088199
 ] 

Jesse Anderson commented on KAFKA-9882:
---

The use case is whenever you have to call a KafkaConsumer method that requires 
a 
java.util.Set<[TopicPartition|https://kafka.apache.org/20/javadoc/org/apache/kafka/common/TopicPartition.html]>.
 These would be methods such as seekToEnd and seekToBeginning.

To use either of these methods with a newly created KafkaConsumer instance, you 
have to poll and then call assignment() in a while loop and check for the size.

The reality is that calling poll() to get assignment() to be non-zero doesn't 
make sense either.

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411799045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] highluck commented on a change in pull request #8114: KAFKA-9290: Update IQ related JavaDocs

2020-04-20 Thread GitBox


highluck commented on a change in pull request #8114:
URL: https://github.com/apache/kafka/pull/8114#discussion_r411791026



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -208,13 +208,13 @@
  * streamBuilder.table(topic, Consumed.with(Serde.String(), 
Serde.String()), Materialized.as(storeName))
  * }
  * 
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
  * {@link KafkaStreams#store(StoreQueryParameters) 
KafkaStreams#store(...)}:
  * {@code
  * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore> localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.>timestampedKeyValueStore());

Review comment:
   ok! thanks :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411787822



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Well, 

[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: avoid unnecessary delay conversion in isDelayed check

2020-04-20 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411779855



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   It seems like it's only used in a test. So I suggest we remove it (as 
well as the Delayed implementation).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411779358



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ijuma commented on issue #8517: MINOR: avoid unnecessary delay conversion in isDelayed check

2020-04-20 Thread GitBox


ijuma commented on issue #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-616875521


   Unrelated flaky test:
   
   > 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_beta]
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411743597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411738965



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] mjsax commented on issue #8114: KAFKA-9290: Update IQ related JavaDocs

2020-04-20 Thread GitBox


mjsax commented on issue #8114:
URL: https://github.com/apache/kafka/pull/8114#issuecomment-616848801


   Ah. Thanks for pointing out the hotfix @highluck. Can you address Sophie's 
comment?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088143#comment-17088143
 ] 

Matthias J. Sax commented on KAFKA-9224:


What you say makes sense. However, I believe that this ticket itself is still 
valuable because the fix would also make EOS more efficient. Atm, in case of 
failure in EOS mode we need to wipe out the entire store as we cannot roll it 
back. If we would introduce a "pending write buffer" (that we never query via 
IQ), we can just "drop" the content of the write buffer and don't need to wipe 
out the store if a TX is aborted. This is an improvement we want for efficiency 
reasons anyway and it would address this ticket "on the side".

Btw: I could even imagine to add a config for "query_committted" vs 
"query_uncommitted" for IQ if anybody would want to have such a feature. In 
"read_uncommitted" we would first query the "write buffter" and if nothing if 
there, query the store.

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411734468



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);
 
-final Map> statefulActiveTaskAssignment =
-new DefaultStateConstrainedBalancedAssignor().assign(
-statefulTasksToRankedCandidates,
-configs.balanceFactor,
-sortedClients,
-clientsToNumberOfThreads,
-tasksToCaughtUpClients
-);
+assignStandbyReplicaTasks(tasksToRemainingStandbys);
+
+assignStatelessActiveTasks();
 
-//  Warmup Replica Tasks  //
+return followupRebalanceNeeded;
+}
 
-final Map> balancedStatefulActiveTaskAssignment =
+private boolean assignStatefulActiveTasks(final Map 
tasksToRemainingStandbys) {
+final Map> statefulActiveTaskAssignment =
 new DefaultBalancedAssignor().assign(
 sortedClients,
 statefulTasks,
 clientsToNumberOfThreads,
 configs.balanceFactor);
 
-final Map tasksToRemainingStandbys =
-statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
-
-final List movements = getMovements(
+return assignTaskMovements(
 statefulActiveTaskAssignment,
-balancedStatefulActiveTaskAssignment,
 tasksToCaughtUpClients,
 clientStates,
 tasksToRemainingStandbys,
-configs.maxWarmupReplicas);
-
-for (final TaskMovement movement : movements) {
-warmupTaskAssignment.get(movement.destination).add(movement.task);
-}
-
-//  Standby Replica Tasks  //
-
-final List>> allTaskAssignmentMaps = asList(
-statefulActiveTaskAssignment,
-warmupTaskAssignment,
-standbyTaskAssignment,
-statelessActiveTaskAssignment
+configs.maxWarmupReplicas
 );
+}
 
-final ValidClientsByTaskLoadQueue clientsByStandbyTaskLoad =
-new ValidClientsByTaskLoadQueue<>(
-getClientPriorityQueueByTaskLoad(allTaskAssignmentMaps),
-allTaskAssignmentMaps
+private void assignStandbyReplicaTasks(final Map 
tasksToRemainingStandbys) {
+final ValidClientsByTaskLoadQueue standbyTaskClientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> 
!clientStates.get(client).assignedTasks().contains(task)
 );
+standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
 
 for (final TaskId task : statefulTasksToRankedCandidates.keySet()) {
 final int numRemainingStandbys = 
tasksToRemainingStandbys.get(task);
-final List clients = clientsByStandbyTaskLoad.poll(task, 
numRemainingStandbys);
+final List clients = standbyTaskClientsByTaskLoad.poll(task, 
numRemainingStandbys);
 for (final UUID client : clients) {
-standbyTaskAssignment.get(client).add(task);
+clientStates.get(client).assignStandby(task);
 }
-clientsByStandbyTaskLoad.offer(clients);
+standbyTaskClientsByTaskLoad.offerAll(clients);
+
 final int numStandbysAssigned = clients.size();
-if (numStandbysAssigned < configs.numStandbyReplicas) {
+if (numStandbysAssigned < numRemainingStandbys) {
 log.warn("Unable to assign {} of {} standby tasks for task 
[{}]. " +
  "There is not enough available capacity. You 
should " +
  "increase the number of threads and/or 
application instances " +
  "to maintain the requested number of standby 
replicas.",
-configs.numStandbyReplicas - numStandbysAssigned, 
configs.numStandbyReplicas, task);
+ numRemainingStandbys - numStandbysAssigned, 
configs.numStandbyReplicas, task);

[GitHub] [kafka] LiamClarkeNZ commented on issue #8520: Add explicit grace period to tumbling window example

2020-04-20 Thread GitBox


LiamClarkeNZ commented on issue #8520:
URL: https://github.com/apache/kafka/pull/8520#issuecomment-616842780


   Yep, fine by me. :)
   
   On Tue, Apr 21, 2020 at 9:32 AM John Roesler 
   wrote:
   
   > *@vvcephei* commented on this pull request.
   >
   > Hey @LiamClarkeNZ  , thanks for the docs
   > improvement! Everything looks good to me, except I'd request we remove
   > L3267. What do you think?
   > --
   >
   > In docs/streams/developer-guide/dsl-api.html
   > :
   >
   > > @@ -3262,12 +3262,15 @@ KTable-KTable Foreign-Key
   >  import org.apache.kafka.streams.kstream.TimeWindows;
   >
   >  // A tumbling time window with a size of 5 minutes (and, 
by definition, an implicit
   > -// advance interval of 5 minutes).
   > +// advance interval of 5 minutes). Note the explicit 
grace period, as the current
   > +// default value is 24 hours, which may be larger than 
needed for smaller windows. 
   > +// Note that this default may change in future major 
version releases.
   >
   > I'd remove the note about changing the default out. We can either find a
   > non-breaking way to change defaults (see
   > 
https://issues.apache.org/jira/browse/KAFKA-8924?focusedCommentId=17088091=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17088091
   > ), or we can't change it.
   >
   > I'd rather not make people paranoid that we're going to break their app
   > semantics from underneath them, because we won't.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > ,
   > or unsubscribe
   > 

   > .
   >
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616840036


   @mjsax I suppose this should help for all integration tests, as they share 
`EmbeddedBroker`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088093#comment-17088093
 ] 

John Roesler edited comment on KAFKA-8924 at 4/20/20, 10:14 PM:


Hey [~atais] , it's been quite a while since the last activity on this ticket. 
I've just had a thought that unblocks us to fix it immediately in the next 
release (see my last comment). Do you still want to work on this?


was (Author: vvcephei):
Hey [~atais] , it's been quite a while since the last activity on this ticket. 
I've just had a thought that unblocks us to fix it immediately in the next 
release (see my last comment). Do you still want to fix on this?

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616839537


   Run 350 times on local and no failure.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on a change in pull request #8522:
URL: https://github.com/apache/kafka/pull/8522#discussion_r411727585



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -84,7 +84,7 @@ public EmbeddedKafkaCluster(final int numBrokers,
 /**
  * Creates and starts a Kafka cluster.
  */
-public void start() throws IOException, InterruptedException {
+public void start() throws IOException {

Review comment:
   Lol, makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9894) Support Java 14

2020-04-20 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-9894.

Resolution: Fixed

Fixed via [https://github.com/apache/kafka/pull/8519. 
|https://github.com/apache/kafka/pull/8519]Jenkins jobs for trunk and PRs have 
been added.

> Support Java 14
> ---
>
> Key: KAFKA-9894
> URL: https://issues.apache.org/jira/browse/KAFKA-9894
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
>
> There have been 3 releases since Java 11, so we are halfway to the next 
> long-term support release (Java 17). It's a good time for Apache Kafka to 
> start testing with post Java 11 JDKs.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9224) State store should not see uncommitted transaction result

2020-04-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088126#comment-17088126
 ] 

John Roesler commented on KAFKA-9224:
-

Thanks for the points, all.

I think my objection is partly practical in nature. We could opt for a single 
solution to allow monotonic queries against any combination of actives or 
standbys, under both EOS or ALOS. But this ticket proposes to introduce code 
that would fix it only when querying actives under EOS. That means someone 
still has to come along later and completely implement monotonic queries. And 
at that point, we could remove the code that fixes it as proposed in this 
ticket. Plus, in the mean time, it's going to become yet more code that is 
special to EOS, more conditionals to test and maintain, more divergent behavior 
to support.

So I guess, as long as no one is actually going to work on this ticket in the 
near term, and as long as they consider the full potential scope of the 
solution when they do pick it up, then I have no objection to this ticket's 
existence. At least it will serve as documentation if anyone does happen to 
expect _not_ to see in-flight transactions in IQs of active replicas under EOS. 
For the sake of brevity, I'll elide further philosophical arguments.

> State store should not see uncommitted transaction result
> -
>
> Key: KAFKA-9224
> URL: https://issues.apache.org/jira/browse/KAFKA-9224
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Currently under EOS, the uncommitted write could be reflected in the state 
> store before the ongoing transaction is finished. This means interactive 
> query could see uncommitted data within state store which is not ideal for 
> users relying on state stores for strong consistency. Ideally, we should have 
> an option to include state store commit as part of ongoing transaction, 
> however an immediate step towards a better reasoned system is to `write after 
> transaction commit`, which means we always buffer data within stream cache 
> for EOS until the ongoing transaction is committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9894) Support Java 14

2020-04-20 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9894:
---
Description: 
There have been 3 releases since Java 11, so we are halfway to the next 
long-term support release (Java 17). It's a good time for Apache Kafka to start 
testing with post Java 11 JDKs.
 

> Support Java 14
> ---
>
> Key: KAFKA-9894
> URL: https://issues.apache.org/jira/browse/KAFKA-9894
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
>
> There have been 3 releases since Java 11, so we are halfway to the next 
> long-term support release (Java 17). It's a good time for Apache Kafka to 
> start testing with post Java 11 JDKs.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9894) Support Java 14

2020-04-20 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-9894:
--

 Summary: Support Java 14
 Key: KAFKA-9894
 URL: https://issues.apache.org/jira/browse/KAFKA-9894
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 2.6.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


mjsax commented on a change in pull request #8522:
URL: https://github.com/apache/kafka/pull/8522#discussion_r411719647



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -84,7 +84,7 @@ public EmbeddedKafkaCluster(final int numBrokers,
 /**
  * Creates and starts a Kafka cluster.
  */
-public void start() throws IOException, InterruptedException {
+public void start() throws IOException {

Review comment:
   If we would have written `throws Exception` from the beginning on, this 
change would not be necessary... (Just to back up my preferred coding stile to 
only use `throws Exception` in tests.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


mjsax commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616831664


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-616828656


   Updated this PR. Will merge after Jenkins passe.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on a change in pull request #8508:
URL: https://github.com/apache/kafka/pull/8508#discussion_r411712314



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -84,6 +87,10 @@
 
 @Before
 public void init() {
+// When executing on Jenkins, the thread name is set to an unknown 
value,
+// hence, we need to set it explicitly to make our log-assertions pass
+Thread.currentThread().setName(threadName);

Review comment:
   Getting the name is much better! Should have done this for the beginning 
on...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8520: Add explicit grace period to tumbling window example

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8520:
URL: https://github.com/apache/kafka/pull/8520#discussion_r411704050



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3262,12 +3262,15 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by 
definition, an implicit
-// advance interval of 5 minutes).
+// advance interval of 5 minutes). Note the explicit grace 
period, as the current
+// default value is 24 hours, which may be larger than needed 
for smaller windows. 
+// Note that this default may change in future major version 
releases.

Review comment:
   I'd remove the note about changing the default out. We can either find a 
non-breaking way to change defaults (see 
https://issues.apache.org/jira/browse/KAFKA-8924?focusedCommentId=17088091=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17088091
 ), or we can't change it.
   
   I'd rather not make people paranoid that we're going to break their app 
semantics from underneath them, because we won't.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616819297


   @junrao Alright, got it done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #8523: Adding github whitelist

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #8523:
URL: https://github.com/apache/kafka/pull/8523#issuecomment-616818171


   @junrao So then we can merge this, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner opened a new pull request #8523: Adding github whitelist

2020-04-20 Thread GitBox


ConcurrencyPractitioner opened a new pull request #8523:
URL: https://github.com/apache/kafka/pull/8523


   This PR is meant to add ConcurrencyPractitioner to the Jenkins whitelist so 
that this user can trigger tests.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9893) Configurable TCP connection timeout for AdminClient

2020-04-20 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan reassigned KAFKA-9893:


Assignee: Cheng Tan

> Configurable TCP connection timeout for AdminClient
> ---
>
> Key: KAFKA-9893
> URL: https://issues.apache.org/jira/browse/KAFKA-9893
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> We do not currently allow for connection timeouts to be defined within 
> AdminClient, and as a result rely on the default OS settings to determine 
> whether a broker is inactive before selecting an alternate broker from 
> bootstrap.
> In the case of a connection timeout on initial handshake, and where 
> tcp_syn_retries is the default (6), we won't timeout an unresponsive broker 
> until ~127s - while the client will timeout sooner (~120s).
> Reducing tcp_syn_retries should mitigate the issue depending on the number of 
> unresponsive brokers within the bootstrap, though this will be applied system 
> wide, and it would be good if we could instead configure connection timeouts 
> for AdminClient.
> The use case where this came up was a customer performing DC failover tests 
> with a stretch cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088093#comment-17088093
 ] 

John Roesler commented on KAFKA-8924:
-

Hey [~atais] , it's been quite a while since the last activity on this ticket. 
I've just had a thought that unblocks us to fix it immediately in the next 
release (see my last comment). Do you still want to fix on this?

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9893) Configurable TCP connection timeout for AdminClient

2020-04-20 Thread Cheng Tan (Jira)
Cheng Tan created KAFKA-9893:


 Summary: Configurable TCP connection timeout for AdminClient
 Key: KAFKA-9893
 URL: https://issues.apache.org/jira/browse/KAFKA-9893
 Project: Kafka
  Issue Type: New Feature
Reporter: Cheng Tan


We do not currently allow for connection timeouts to be defined within 
AdminClient, and as a result rely on the default OS settings to determine 
whether a broker is inactive before selecting an alternate broker from 
bootstrap.

In the case of a connection timeout on initial handshake, and where 
tcp_syn_retries is the default (6), we won't timeout an unresponsive broker 
until ~127s - while the client will timeout sooner (~120s).

Reducing tcp_syn_retries should mitigate the issue depending on the number of 
unresponsive brokers within the bootstrap, though this will be applied system 
wide, and it would be good if we could instead configure connection timeouts 
for AdminClient.

The use case where this came up was a customer performing DC failover tests 
with a stretch cluster.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088091#comment-17088091
 ] 

John Roesler edited comment on KAFKA-8924 at 4/20/20, 9:20 PM:
---

I just had another conversation with someone else who got bit by this default.

 

Changing the default grace period to zero can be accomplished right now with no 
backward compatibility concern by deprecating the current factory:
{code:java}
public static TimeWindows of(final Duration size){code}
and replacing it with a new, equivalent factory (with different default 
semantics)
{code:java}
public static TimeWindows ofSize(final Duration size){code}
 

However, I'm concerned that this will just cause the opposite problem... People 
who have very slight amounts of lateness in their streams and don't even 
realize it would be losing data (and maybe not realize that, either, until too 
late). It seems likely that people may not have disorderedness in their test 
data, but only in production, if they don't anticipate this issue.

For that reason, it feels sane to make people at least think about it:
{code:java}
public static TimeWindows ofSizeAndGracePeriod(final Duration size, final 
Duration grace)
{code}
I'll assert that it's not inconvenient at all to say explicitly:
{code:java}
ofSizeAndGracePeriod(/*whatever*/, Duration.ZERO)
{code}
If that's what you really wanted. Way more convenient than having to track down 
either dropped data or delayed Suppress results later on.


was (Author: vvcephei):
I just had another conversation with someone else who got bit by this default.

 

Changing the default grace period to zero can be accomplished right now with no 
backward compatibility concern by deprecating the current factory:
{code:java}
public static TimeWindows of(final Duration size){code}
and replacing it with a new, equivalent factory (with different default 
semantics)
{code:java}
public static TimeWindows ofSize(final Duration size){code}
 

However, I'm concerned that this will just cause the opposite problem... People 
who have very slight amounts of lateness in their streams and don't even 
realize it would be losing data (and maybe not realize that, either, until too 
late). It seems likely that people may not have disorderedness in their test 
data, but only in production, if they don't anticipate this issue.

For that reason, it feels sane to make people at least think about it:
{code:java}
public static TimeWindows ofSizeAndGracePeriod(final Duration size, final 
Duration grace)
{code}
I'll assert that it's not inconvenient at all to say explicitly:
{code:java}
ofSizeAndGracePeriod(/*whatever*/, Duration.ZERO)
{code}
If that's what you really wanted. Way more convenient that having to track down 
either dropped data or delayed Suppress results later on.

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This 

[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-20 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088091#comment-17088091
 ] 

John Roesler commented on KAFKA-8924:
-

I just had another conversation with someone else who got bit by this default.

 

Changing the default grace period to zero can be accomplished right now with no 
backward compatibility concern by deprecating the current factory:
{code:java}
public static TimeWindows of(final Duration size){code}
and replacing it with a new, equivalent factory (with different default 
semantics)
{code:java}
public static TimeWindows ofSize(final Duration size){code}
 

However, I'm concerned that this will just cause the opposite problem... People 
who have very slight amounts of lateness in their streams and don't even 
realize it would be losing data (and maybe not realize that, either, until too 
late). It seems likely that people may not have disorderedness in their test 
data, but only in production, if they don't anticipate this issue.

For that reason, it feels sane to make people at least think about it:
{code:java}
public static TimeWindows ofSizeAndGracePeriod(final Duration size, final 
Duration grace)
{code}
I'll assert that it's not inconvenient at all to say explicitly:
{code:java}
ofSizeAndGracePeriod(/*whatever*/, Duration.ZERO)
{code}
If that's what you really wanted. Way more convenient that having to track down 
either dropped data or delayed Suppress results later on.

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-20 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088086#comment-17088086
 ] 

Boyang Chen commented on KAFKA-9882:


Thanks for the ticket. Are there any concrete use cases for blocking on the 
assignment?

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r411692621



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   @abbccdda I changed the code to add a static_membership parameter to the 
test matrix. We simply avoid this for 2.4 where this is broken due to the 
command line validation in that release.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616808689


   @vvcephei @mjsax Call for a review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-616806079


   Java 8 passed.
   Java11:  
`kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
   Java14 failed due to #8519 (already merged).
   
   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8519: MINOR: Upgrade gradle plugins and test libraries for Java 14 support

2020-04-20 Thread GitBox


ijuma commented on issue #8519:
URL: https://github.com/apache/kafka/pull/8519#issuecomment-616802690


   2 flaky tests, one in each job:
   * 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_beta]
   * 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on a change in pull request #8522:
URL: https://github.com/apache/kafka/pull/8522#discussion_r411680309



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -98,6 +98,7 @@ public void start() throws IOException, InterruptedException {
 putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
 putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
 putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5);
+putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5);

Review comment:
   This is the actual fix, other parts are just side cleanups.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda opened a new pull request #8522:
URL: https://github.com/apache/kafka/pull/8522


   This PR tries to fix the flaky 
EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
 by making the bootstrapping of the test to be less painful with fewer number 
of partitions of txn log.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9868) Flaky Test EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore

2020-04-20 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9868:
---
Description: 
h3. Error Message

java.lang.AssertionError: Condition not met within timeout 15000. Expected 
ERROR state but driver is on RUNNING

 

>From what we observed, in unit test with transaction turned on, it takes a 
>long time to bootstrap the test as well as sometimes getting too many open 
>files for system test. To reduce the start time and make tests less flaky, we 
>should set the number of txn log partitions to a much smaller number than 50.

  was:
h3. Error Message

java.lang.AssertionError: Condition not met within timeout 15000. Expected 
ERROR state but driver is on RUNNING


> Flaky Test 
> EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
> ---
>
> Key: KAFKA-9868
> URL: https://issues.apache.org/jira/browse/KAFKA-9868
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Sophie Blee-Goldman
>Assignee: Boyang Chen
>Priority: Major
>  Labels: flaky-test
>
> h3. Error Message
> java.lang.AssertionError: Condition not met within timeout 15000. Expected 
> ERROR state but driver is on RUNNING
>  
> From what we observed, in unit test with transaction turned on, it takes a 
> long time to bootstrap the test as well as sometimes getting too many open 
> files for system test. To reduce the start time and make tests less flaky, we 
> should set the number of txn log partitions to a much smaller number than 50.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9868) Flaky Test EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore

2020-04-20 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9868:
--

Assignee: Boyang Chen

> Flaky Test 
> EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
> ---
>
> Key: KAFKA-9868
> URL: https://issues.apache.org/jira/browse/KAFKA-9868
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Sophie Blee-Goldman
>Assignee: Boyang Chen
>Priority: Major
>  Labels: flaky-test
>
> h3. Error Message
> java.lang.AssertionError: Condition not met within timeout 15000. Expected 
> ERROR state but driver is on RUNNING



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411676361



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411673115



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411672041



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates);
+this.validClientCriteria = validClientCriteria;
+}
+
+/**
+= * @return the next least loaded client that satisfies the given 
criteria, or null if none do
+ */
+UUID poll(final TaskId task) {
+final List validClient = poll(task, 1);
+return validClient.isEmpty() ? null : validClient.get(0);
+}
+
+/**
+ * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid
+ * candidates for the given task
+ */
+List poll(final TaskId task, final int numClients) {
+final List nextLeastLoadedValidClients = new LinkedList<>();
+final Set invalidPolledClients = new HashSet<>();
+while (nextLeastLoadedValidClients.size() < numClients) {
+UUID candidateClient;
+while (true) {
+candidateClient = clientsByTaskLoad.poll();
+if (candidateClient == null) {
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+if (validClientCriteria.apply(candidateClient, task)) {
+nextLeastLoadedValidClients.add(candidateClient);
+break;
+} else {
+invalidPolledClients.add(candidateClient);
+}
+}
+}
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+void offerAll(final Collection clients) {
+for (final UUID client : clients) {
+offer(client);
+}
+}
+
+void offer(final UUID client) {
+if (uniqueClients.contains(client)) {

Review comment:
   I think it's just a computer-sciencey matter of principle. 
`clientsByTaskLoad` is a linear collection, so every `offer` would become 
`O(n)` if we did a `contains` call on it every time. Right now, it's only 
`O(n)` when we need to remove the prior record for the same client, and 
`O(log(n))` otherwise.
   
   Does it really matter? I'm not sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8508:
URL: https://github.com/apache/kafka/pull/8508#discussion_r411667311



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -84,6 +87,10 @@
 
 @Before
 public void init() {
+// When executing on Jenkins, the thread name is set to an unknown 
value,
+// hence, we need to set it explicitly to make our log-assertions pass
+Thread.currentThread().setName(threadName);

Review comment:
   Hmm, this might be surprising. Won't this cause the test executor thread 
to be called "threadName" from now until the end of the build?
   
   Do you think we could instead get the currentThread's name and use that in 
the assertions? Or otherwise make the assertions agnostic to the name of the 
thread?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


junrao commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616787194


   @ConcurrencyPractitioner : You can just submit a separate PR to add yourself 
in .asf.yml.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soenkeliebau commented on issue #8464: KAFKA-9852: Change the max duration that calls to the buffer pool can block from 2000ms to 10ms

2020-04-20 Thread GitBox


soenkeliebau commented on issue #8464:
URL: https://github.com/apache/kafka/pull/8464#issuecomment-616782288


   retest this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


cadonna commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411366111



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);

Review comment:
   I love it when a comment gets killed by a meaningful method name!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);
 
-final Map> statefulActiveTaskAssignment =
-new DefaultStateConstrainedBalancedAssignor().assign(
-statefulTasksToRankedCandidates,
-configs.balanceFactor,
-sortedClients,
-clientsToNumberOfThreads,
-tasksToCaughtUpClients
-);
+assignStandbyReplicaTasks(tasksToRemainingStandbys);
+
+assignStatelessActiveTasks();
 
-//  Warmup Replica Tasks  //
+return followupRebalanceNeeded;
+}
 
-final Map> balancedStatefulActiveTaskAssignment =
+private boolean assignStatefulActiveTasks(final Map 
tasksToRemainingStandbys) {
+final Map> statefulActiveTaskAssignment =
 new DefaultBalancedAssignor().assign(
 sortedClients,
 statefulTasks,
 clientsToNumberOfThreads,
 configs.balanceFactor);

Review comment:
   prop:
   ```suggestion
   final Map> statefulActiveTaskAssignment = new 
DefaultBalancedAssignor().assign(
   sortedClients,
   statefulTasks,
   clientsToNumberOfThreads,
   configs.balanceFactor
   );
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+ 

[GitHub] [kafka] cmccabe commented on a change in pull request #8396: KAFKA-9754 - Trogdor - Ignore produce errors, better error statistics.

2020-04-20 Thread GitBox


cmccabe commented on a change in pull request #8396:
URL: https://github.com/apache/kafka/pull/8396#discussion_r411654793



##
File path: 
tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
##
@@ -72,6 +75,8 @@
 private final TopicsSpec activeTopics;
 private final TopicsSpec inactiveTopics;
 private final boolean useConfiguredPartitioner;
+private final boolean ignoreProduceErrors;
+private final int topicVerificationRetries;

Review comment:
   It would be better to call this "topicVerificationTries" since 0 is then 
an unreasonable value.  For REtries, 0 is a reasonable value and not something 
that we should assume is equivalent to "use the default."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-04-20 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9891:
--

Assignee: Boyang Chen

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpointNODE_1 log1:2020-04-15 
> 21:11:33.942 

[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-04-20 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088044#comment-17088044
 ] 

Boyang Chen commented on KAFKA-9891:


Thanks for the report, will take a look this week.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> 

  1   2   >