Hangleton commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r862786317
##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -87,8 +103,9 @@ public Sample oldest(long now) {
Sample oldest = this.samples.get(0);
for (int i = 1; i < this.samples.size(); i++) {
Sample curr = this.samples.get(i);
- if (curr.lastWindowMs < oldest.lastWindowMs)
+ if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) &&
curr.isActive()) { // only consider active samples
Review Comment:
Does the `isActive` really required? Before the oldest sample is computed,
expired samples are reset which brings the `lastWindonMs` equal to `now`.
Marginal note: this assumes at least one sample is active (that is if all
samples between 1 and `samples.size()` are not active, the first sample in the
list has to be active and would be the current sample).
##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -110,25 +127,40 @@ public String toString() {
protected void purgeObsoleteSamples(MetricConfig config, long now) {
long expireAge = config.samples() * config.timeWindowMs();
for (Sample sample : samples) {
- if (now - sample.lastWindowMs >= expireAge)
+ if (now - sample.getLastWindowMs() >= expireAge)
sample.reset(now);
}
}
protected static class Sample {
- public double initialValue;
- public long eventCount;
- public long lastWindowMs;
- public double value;
+ private double initialValue;
+ private long eventCount;
+ private long lastWindowMs;
+ private double value;
+
+ /**
+ * A Sample object could be re-used in a ring buffer to store future
samples for space efficiency.
+ * Thus, a sample could be in either of the following lifecycle states:
+ * NOT_INITIALIZED: Sample has not been initialized.
+ * ACTIVE: Sample has values and is currently
+ * RESET: Sample has been reset and the object is not destroyed so
that it could be used for storing future
+ * samples.
+ */
+ private enum LifecycleState {
Review Comment:
Cf. comment on calculation of the oldest sample - not sure this is required
given `reset` update the `lastWindowMs` of the sample to the "current"
timestamp at the time of the reset.
##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##########
@@ -68,28 +63,61 @@ public double measure(MetricConfig config, long now) {
}
public long windowSize(MetricConfig config, long now) {
- // purge old samples before we compute the window size
+ // Purge obsolete samples. Obsolete samples are the ones which are not
relevant to the current calculation
+ // because their creation time is outside (before) the duration of
time window used to calculate rate.
stat.purgeObsoleteSamples(config, now);
/*
* Here we check the total amount of time elapsed since the oldest
non-obsolete window.
- * This give the total windowSize of the batch which is the time used
for Rate computation.
- * However, there is an issue if we do not have sufficient data for
e.g. if only 1 second has elapsed in a 30 second
- * window, the measured rate will be very high.
- * Hence we assume that the elapsed time is always N-1 complete
windows plus whatever fraction of the final window is complete.
+ * This gives the duration of computation time window which used to
calculate Rate.
+ *
+ * For scenarios when rate computation is performed after at least
`config.samples` have completed,
+ * the duration of computation time window is determined by:
+ * window duration = (now - start time of oldest non-obsolete
window)
+ *
+ * ## Special case: First ever window
+ * A special scenario occurs when rate calculation is performed before
at least `config.samples` have completed
+ * (e.g. if only 1 second has elapsed in a 30 second). In such a
scenario, window duration would be equal to the
+ * time elapsed in the current window (since oldest non-obsolete
window is current window). This leads to the
+ * following values for rate. Consider the following example:
+ * config.timeWindowMs() = 1s
+ * config.samples() = 2
+ * Record events (E) at timestamps:
+ * E1 = CurrentTimeStamp (T1)
+ * E2 = T1 + 30ms
+ * E2 = T1 + 60ms
Review Comment:
E2 -> E3
##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -34,22 +34,38 @@
*/
public abstract class SampledStat implements MeasurableStat {
- private double initialValue;
+ private final double initialValue;
private int current = 0;
+
protected List<Sample> samples;
public SampledStat(double initialValue) {
this.initialValue = initialValue;
this.samples = new ArrayList<>(2);
}
+ /**
+ * {@inheritDoc}
+ *
+ * On every record, do the following:
+ * 1. Check if the current window has expired
+ * 2. If yes, then advance the current pointer to new window. The start
time of the new window is set to nearest
+ * possible starting point for the new window. The nearest starting
point occurs at config.timeWindowMs intervals
+ * from the end time of last known window.
+ * 3. Update the recorded value for the current window
+ * 4. Increase the number of event count
+ */
@Override
- public void record(MetricConfig config, double value, long timeMs) {
- Sample sample = current(timeMs);
- if (sample.isComplete(timeMs, config))
- sample = advance(config, timeMs);
- update(sample, config, value, timeMs);
- sample.eventCount += 1;
+ public void record(MetricConfig config, double value, long
recordingTimeMs) {
+ Sample sample = current(recordingTimeMs);
+ if (sample.isComplete(recordingTimeMs, config)) {
+ final long previousWindowStartTime = sample.getLastWindowMs();
+ sample = advance(config, recordingTimeMs);
Review Comment:
This should be equivalent to:
```
final long previousWindowStartTime = sample.getLastWindowMs();
final long previousWindowEndtime = previousWindowStartTime +
config.timeWindowMs();
sample = advance(config, recordingTimeMs - ((recordingTimeMs -
previousWindowEndtime) % config.timeWindowMs()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]