kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463936412
########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -82,18 +108,54 @@ object FinalizedFeatureCache extends Logging { " The existing cache contents are %s").format(latest, oldFeatureAndEpoch) throw new FeatureCacheUpdateException(errorMsg) } else { - val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features) if (!incompatibleFeatures.empty) { val errorMsg = ("FinalizedFeatureCache update failed since feature compatibility" + " checks failed! Supported %s has incompatibilities with the latest %s." - ).format(SupportedFeatures.get, latest) + ).format(brokerFeatures.supportedFeatures, latest) throw new FeatureCacheUpdateException(errorMsg) } else { - val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format( + val logMsg = "Updated cache from existing %s to latest %s".format( oldFeatureAndEpoch, latest) - featuresAndEpoch = Some(latest) + synchronized { + featuresAndEpoch = Some(latest) + notifyAll() + } info(logMsg) } } } + + /** + * Causes the current thread to wait no more than timeoutMs for the specified condition to be met. + * It is guaranteed that the provided condition will always be invoked only from within a + * synchronized block. + * + * @param waitCondition the condition to be waited upon: + * - if the condition returns true, then, the wait will stop. + * - if the condition returns false, it means the wait must continue until + * timeout. + * + * @param timeoutMs the timeout (in milli seconds) + * + * @throws TimeoutException if the condition is not met within timeoutMs. + */ + private def waitUntilConditionOrThrow(waitCondition: () => Boolean, timeoutMs: Long): Unit = { + if(timeoutMs < 0L) { + throw new IllegalArgumentException(s"Expected timeoutMs >= 0, but $timeoutMs was provided.") + } + val waitEndTimeNanos = System.nanoTime() + (timeoutMs * 1_000_000) Review comment: Since the app depends on monotonically increasing elapsed time values, `System.nanoTime()` is preferred. `System.currentTimeMillis()` can change due to daylight saving time, users changing the time settings, leap seconds, and internet time sync etc. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org