kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463936412



##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -82,18 +108,54 @@ object FinalizedFeatureCache extends Logging {
         " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
       throw new FeatureCacheUpdateException(errorMsg)
     } else {
-      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
       if (!incompatibleFeatures.empty) {
         val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
           " checks failed! Supported %s has incompatibilities with the latest 
%s."
-          ).format(SupportedFeatures.get, latest)
+          ).format(brokerFeatures.supportedFeatures, latest)
         throw new FeatureCacheUpdateException(errorMsg)
       } else {
-        val logMsg = "Updated cache from existing finalized %s to latest 
finalized %s".format(
+        val logMsg = "Updated cache from existing %s to latest %s".format(
           oldFeatureAndEpoch, latest)
-        featuresAndEpoch = Some(latest)
+        synchronized {
+          featuresAndEpoch = Some(latest)
+          notifyAll()
+        }
         info(logMsg)
       }
     }
   }
+
+  /**
+   * Causes the current thread to wait no more than timeoutMs for the 
specified condition to be met.
+   * It is guaranteed that the provided condition will always be invoked only 
from within a
+   * synchronized block.
+   *
+   * @param waitCondition   the condition to be waited upon:
+   *                         - if the condition returns true, then, the wait 
will stop.
+   *                         - if the condition returns false, it means the 
wait must continue until
+   *                           timeout.
+   *
+   * @param timeoutMs       the timeout (in milli seconds)
+   *
+   * @throws                TimeoutException if the condition is not met 
within timeoutMs.
+   */
+  private def waitUntilConditionOrThrow(waitCondition: () => Boolean, 
timeoutMs: Long): Unit = {
+    if(timeoutMs < 0L) {
+      throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but 
$timeoutMs was provided.")
+    }
+    val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1_000_000)

Review comment:
       Since the app depends on monotonically increasing elapsed time values, 
`System.nanoTime()` is preferred. 
   `System.currentTimeMillis()` can change due to daylight saving time, users 
changing the time settings, leap seconds, and internet time sync etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to