I'm attaching a patch to rrd_update.c against trunk that fixes the phase-shift bug described below.

When one or more primary data point times were missed, the SEASONAL and DEVSEASONAL archives were marked as being up-to-date, so that they would not be written to. It was correct not to write to these archives, but the code failed to advance the pointers within the SEASONAL and DEVSEASONAL archives so that future updates would go to the correct location in the archives.

Rather than mark these archives as up-to-date (by setting rra_step_cnt[rra_idx] = 0), my patch allocates a new "skip_update" array that is set to 1 for SEASONAL and DEVSEASONAL archives that have missed one or more primary data points. When an RRA is written to, the cur_row pointer advancement happens for all archives, but the skip_update array is checked just before actually writing out the changes.

The bug demonstration script supplied in my previous email now passes.

The patch contains a few touch-ups as well (spelling + trivial interface change to write_RRA_row).

Please give it a whirl!

Evan

Evan Miller wrote:
I'd like to report a bug that appears in RRDtool 1.2.15, 1.2.99907052400, and 1.2.99907080300.

The problem: When a Holt-Winters archive misses several updates, the predictions for those timestamps are postponed rather than ignored. For example, if data collection halts at 1 PM and resumes at 2 PM, then the predictions will be shifted by an hour; the 1:15 PM prediction will appear at 2:15 PM, the 1:30 PM prediction will appear at 2:30 PM, etc. A data outage can corrupt all the predictions in an archive in this way.

Below is a test script that demonstrates the problem. In the first season, there is a spike at T=T_0 + 4 (where T_0 is the first timestamp). If a season has length S, we'd expect a spike in the prediction for T=T_0 + S + 4; however, if I introduce a data outage for T=T_0 + S + 1 .. T_0 + S + 3, the prediction spike moves to T=T_0 + S + 8. RRDtool apparently treats a season as a period of time for which observations exist; I believe the correct behavior is to treat a season as a period of time regardless of whether observations are available.

I'll try to find the source of the bug myself, but I'd appreciate any insight, patches, or workarounds people might have concerning it.

Evan




#!/usr/bin/perl -w

use RRDs 1.2015;
use Data::Dumper;
use strict;

# Updates an RRA, creating it if it doesn't exist
sub rrd_update($$$$$$@) {
     my ($path, $type, $time, $step, $readings, $archives, $heartbeat) = @_;

     $heartbeat ||= 5;

     if (!-f $path) {
         my @args = ($path, "--start", $time - $step, "--step", $step,
                 (map {
                  "DS:$_:$type:".($step*$heartbeat).":U:U"
                  } keys %$readings),
                 @$archives);
         RRDs::create @args;
return "RRDtool create error: ".join(' ', @args).RRDs::error if RRDs::error;
     }
     my $info = RRDs::info($path);
     RRDs::update $path, "--template", join(":", sort keys %$readings),
"$time:".join(":", map { $$readings{$_} } sort keys %$readings);

     return "RRDtool update error on $path: ".RRDs::error if RRDs::error;
}

sub rrd_hw_archive_definition($$$$) {
     my ($period, $alpha, $beta, $gamma) = @_;
     my $threshold = 3;
     my $window    = 5;
     return [
             "RRA:HWPREDICT:".(2*$period).":$alpha:$beta:$period:2",  # 1
             "RRA:SEASONAL:$period:$gamma:1",                         # 2
             "RRA:DEVPREDICT:".(2*$period).":4",                      # 3
             "RRA:DEVSEASONAL:$period:$gamma:1",                      # 4
             "RRA:FAILURES:$period:$threshold:$window:4",             # 5
             ];
}

my $period = 8;
my $rrd_file = "/tmp/hwshift.rrd";

unlink $rrd_file;

my $hwarchives = rrd_hw_archive_definition($period, 0.1, 0.2, 0.3);

my $step = 1;

my $time = 1000000000;

my $counter;

rrd_update $rrd_file, "COUNTER", $time, $step, { reading => $counter = 0 }, $hwarchives, 1;

# period 1
rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives;
# spike occurs here
rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 10}, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives;

# period 2
rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives;
# missed updates
$time++; $counter += 1;
$time++; $counter += 1;
# spike occurs here again
$time++; $counter += 10;
rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives; rrd_update $rrd_file, "COUNTER", ++$time, $step, { reading => $counter += 1 }, $hwarchives;

my ($start, $rrd_step, $names, $data) = RRDs::fetch($rrd_file, "HWPREDICT", "-s",
                 $time - 1, "-e", $time - 1, "-r", $step);

print "Most recent prediction should be 1, is $$data[0][0]\n";

Index: src/rrd_update.c
===================================================================
--- src/rrd_update.c	(revision 1191)
+++ src/rrd_update.c	(working copy)
@@ -73,9 +73,9 @@
     info_t *);
 
 static int allocate_data_structures(
-    rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp,
-    const char *tmplt, long **tmpl_idx, unsigned long *tmpl_cnt, 
-    unsigned long **rra_step_cnt, rrd_value_t **pdp_new);
+    rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
+    long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
+    unsigned long **skip_update, rrd_value_t **pdp_new);
 
 static int parse_template(rrd_t *rrd, const char *tmplt,
     unsigned long *tmpl_cnt, long *tmpl_idx);
@@ -96,6 +96,7 @@
     unsigned long  tmpl_cnt,
     info_t       **pcdp_summary,
     int            version,
+    unsigned long *skip_update,
     int           *schedule_smooth);
 
 static int parse_ds(rrd_t *rrd, char **updvals, long *tmpl_idx, char *input,
@@ -127,7 +128,8 @@
     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
     rrd_value_t **last_seasonal_coef, rrd_value_t **seasonal_coef,
-    rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth);
+    rrd_value_t *pdp_temp, unsigned long *rra_current, 
+    unsigned long *skip_update, int *schedule_smooth);
 
 static int do_schedule_smooth(rrd_t *rrd, unsigned long rra_idx, 
     unsigned long elapsed_pdp_st);
@@ -165,11 +167,12 @@
 
 static int write_to_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
     unsigned long *rra_step_cnt, unsigned long rra_begin, 
-    unsigned long *rra_current, time_t current_time, info_t **pcdp_summary);
+    unsigned long *rra_current, time_t current_time, 
+    unsigned long *skip_update, info_t **pcdp_summary);
 
 static int write_RRA_row(rrd_file_t *rrd_file, rrd_t *rrd, unsigned long rra_idx,
     unsigned long *rra_current, unsigned short CDP_scratch_idx, info_t **pcdp_summary,
-    time_t *rra_time);
+    time_t rra_time);
 
 static int smooth_all_rras(rrd_t *rrd, rrd_file_t *rrd_file, 
     unsigned long rra_begin);
@@ -355,6 +358,7 @@
     int            version;     /* rrd version */
     rrd_file_t    *rrd_file;
     char          *arg_copy;    /* for processing the argv */
+    unsigned long *skip_update; /* RRAs to advance but not write */
 
     /* need at least 1 arguments: data. */
     if (argc < 1) {
@@ -382,7 +386,7 @@
 
     if (allocate_data_structures(&rrd, &updvals, 
                     &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
-                    &rra_step_cnt, &pdp_new) == -1) {
+                    &rra_step_cnt, &skip_update, &pdp_new) == -1) {
         goto err_close;
     }
 
@@ -395,7 +399,7 @@
         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current, 
                         &current_time, &current_time_usec, pdp_temp, pdp_new,
                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt, &pcdp_summary, 
-                        version, &schedule_smooth) == -1) {
+                        version, skip_update, &schedule_smooth) == -1) {
             free(arg_copy);
             break;
         }
@@ -431,6 +435,7 @@
     free(pdp_new);
     free(tmpl_idx);
     free(pdp_temp);
+    free(skip_update);
     free(updvals);
     return 0;
 
@@ -438,6 +443,7 @@
     free(pdp_new);
     free(tmpl_idx);
     free(pdp_temp);
+    free(skip_update);
     free(updvals);
   err_close:
     rrd_close(rrd_file);
@@ -491,9 +497,15 @@
  * Returns 0 on success, -1 on error.
  */
 static int allocate_data_structures(
-    rrd_t *rrd, char ***updvals, rrd_value_t **pdp_temp, const char *tmplt, 
-    long **tmpl_idx, unsigned long *tmpl_cnt, unsigned long **rra_step_cnt, 
-    rrd_value_t **pdp_new) 
+    rrd_t         *rrd, 
+    char        ***updvals, 
+    rrd_value_t  **pdp_temp, 
+    const char    *tmplt, 
+    long         **tmpl_idx, 
+    unsigned long *tmpl_cnt, 
+    unsigned long **rra_step_cnt, 
+    unsigned long **skip_update,
+    rrd_value_t  **pdp_new) 
 {
     unsigned i, ii;
     if ((*updvals = (char **)malloc(sizeof(char *) 
@@ -501,17 +513,20 @@
         rrd_set_error("allocating updvals pointer array");
         return -1;
     }
-
     if ((*pdp_temp = (rrd_value_t *)malloc(sizeof(rrd_value_t)
                            * rrd->stat_head->ds_cnt)) == NULL) {
         rrd_set_error("allocating pdp_temp ...");
         goto err_free_updvals;
     }
-
+    if ((*skip_update = (unsigned long *)malloc(sizeof(unsigned long)
+                           * rrd->stat_head->rra_cnt)) == NULL) {
+        rrd_set_error("allocating skip_update...");
+        goto err_free_pdp_temp;
+    }
     if ((*tmpl_idx = (long *)malloc(sizeof(unsigned long)
                            * (rrd->stat_head->ds_cnt + 1))) == NULL) {
         rrd_set_error("allocating tmpl_idx ...");
-        goto err_free_pdp_temp;
+        goto err_free_skip_update;
     }
     if ((*rra_step_cnt = (unsigned long *)malloc(sizeof(unsigned long) 
                            * (rrd->stat_head->rra_cnt))) == NULL) {
@@ -534,20 +549,24 @@
 
     if (tmplt != NULL) {
         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
-            goto err_free_tmpl_idx;
+            goto err_free_rra_step_cnt;
         }
     }
 
     if ((*pdp_new = (rrd_value_t *)malloc(sizeof(rrd_value_t)
                           * rrd->stat_head->ds_cnt)) == NULL) {
         rrd_set_error("allocating pdp_new ...");
-        goto err_free_tmpl_idx;
+        goto err_free_rra_step_cnt;
     }
 
     return 0;
 
+err_free_rra_step_cnt:
+    free(*rra_step_cnt);
 err_free_tmpl_idx:
     free(*tmpl_idx);
+err_free_skip_update:
+    free(*skip_update);
 err_free_pdp_temp:
     free(*pdp_temp);
 err_free_updvals:
@@ -621,6 +640,7 @@
     unsigned long  tmpl_cnt,
     info_t       **pcdp_summary,
     int            version,
+    unsigned long *skip_update,
     int           *schedule_smooth)
 {
     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
@@ -684,7 +704,7 @@
                                 &last_seasonal_coef,
                                 &seasonal_coef,
                                 pdp_temp, rra_current,
-                                schedule_smooth) == -1)
+                                skip_update, schedule_smooth) == -1)
         {
             goto err_free_coefficients;
         }
@@ -693,9 +713,8 @@
         {
             goto err_free_coefficients;
         }
-        if (write_to_rras(rrd, rrd_file, 
-                rra_step_cnt, rra_begin, rra_current, 
-                *current_time, pcdp_summary) == -1)
+        if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin, 
+                rra_current, *current_time, skip_update, pcdp_summary) == -1)
         {
             goto err_free_coefficients;
         }
@@ -984,17 +1003,12 @@
     double *post_int,
     unsigned long *proc_pdp_cnt) 
 {
-
-    unsigned long proc_pdp_st;  /* which pdp_st was the last
-                                 * to be processed */
-    unsigned long occu_pdp_st;  /* when was the pdp_st
-                                 * before the last update
+    unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
+    unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
                                  * time */
-    unsigned long proc_pdp_age; /* how old was the data in
-                                 * the pdp prep area when it
-                                 * was last updated */
-    unsigned long occu_pdp_age; /* how long ago was the last
-                                 * pdp_step time */
+    unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
+                                 * when it was last updated */
+    unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
 
     /* when was the current pdp started */
     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
@@ -1182,7 +1196,7 @@
 /*
  * Iterate over all the RRAs for a given DS and:
  * 1. Decide whether to schedule a smooth later
- * 2. Shift the seasonal array if it's a bulk update
+ * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
  * 3. Update the CDP
  *
  * Returns 0 on success, -1 on error
@@ -1191,7 +1205,8 @@
     rrd_t *rrd, unsigned long *rra_step_cnt, unsigned long rra_begin, 
     rrd_file_t *rrd_file, unsigned long elapsed_pdp_st, unsigned long proc_pdp_cnt,
     rrd_value_t  **last_seasonal_coef, rrd_value_t  **seasonal_coef,
-    rrd_value_t *pdp_temp, unsigned long *rra_current, int *schedule_smooth) 
+    rrd_value_t *pdp_temp, unsigned long *rra_current, 
+    unsigned long *skip_update, int *schedule_smooth)
 {
     unsigned long rra_idx;
     /* index into the CDP scratch array */
@@ -1204,6 +1219,7 @@
     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
         start_pdp_offset = rrd->rra_def[rra_idx].pdp_cnt - proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
+        skip_update[rra_idx] = 0;
         if (start_pdp_offset <= elapsed_pdp_st) {
             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
                     rrd->rra_def[rra_idx].pdp_cnt + 1;
@@ -1216,11 +1232,8 @@
              * so that they will be correct for the next observed value; note that for
              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
-            if (rra_step_cnt[rra_idx] > 2) {
-                /* skip update by resetting rra_step_cnt[rra_idx], note that this is not data
-                 * source specific; this is due to the bulk update, not a DNAN value
-                 * for the specific data source. */
-                rra_step_cnt[rra_idx] = 0;
+            if (rra_step_cnt[rra_idx] > 1) {
+                skip_update[rra_idx] = 1;
                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
                                 elapsed_pdp_st, last_seasonal_coef);
                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
@@ -1237,7 +1250,6 @@
             }
             *rra_current = rrd_tell(rrd_file);
         }
-        /* if cf is DEVSEASONAL or SEASONAL */
         if (rrd_test_error())
             return -1;
 
@@ -1644,15 +1656,12 @@
     unsigned long rra_begin, 
     unsigned long *rra_current,
     time_t current_time,
+    unsigned long *skip_update,
     info_t **pcdp_summary)
 {
     unsigned long rra_idx;
     unsigned long rra_start;    
     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
-    /* number of PDP steps since the last update that
-     * are assigned to the first CDP to be generated
-     * since the last update. */
-    unsigned short scratch_idx;
     time_t    rra_time = 0; /* time of update for a RRA */
 
     /* Ready to write to disk */
@@ -1667,7 +1676,7 @@
             rrd->rra_ptr[rra_idx].cur_row++;
             if (rrd->rra_ptr[rra_idx].cur_row >= rrd->rra_def[rra_idx].row_cnt)
                 rrd->rra_ptr[rra_idx].cur_row = 0; /* wrap around */
-            /* positition on the first row */
+            /* position on the first row */
             rra_pos_tmp = rra_start +
                     (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
                     sizeof(rrd_value_t);
@@ -1681,22 +1690,20 @@
 #ifdef DEBUG
             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
 #endif
-            scratch_idx = CDP_primary_val;
-            if (*pcdp_summary != NULL) {
-                rra_time = (current_time - current_time
-                                % (rrd->rra_def[rra_idx].pdp_cnt *
-                                        rrd->stat_head->pdp_step))
-                        - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
-                                        rrd->stat_head->pdp_step);
+            if (!skip_update[rra_idx]) {
+                if (*pcdp_summary != NULL) {
+                    rra_time = (current_time - current_time
+                                    % (rrd->rra_def[rra_idx].pdp_cnt *
+                                            rrd->stat_head->pdp_step))
+                            - ((rra_step_cnt[rra_idx] - 1) * rrd->rra_def[rra_idx].pdp_cnt *
+                                            rrd->stat_head->pdp_step);
+                }
+                if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
+                                        pcdp_summary, rra_time) == -1)
+                    return -1;
             }
-            if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current, scratch_idx, 
-                                    pcdp_summary, &rra_time) == -1)
-                return -1;
-            if (rrd_test_error())
-                return -1;
 
             /* write other rows of the bulk update, if any */
-            scratch_idx = CDP_secondary_val;
             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
                 if (++rrd->rra_ptr[rra_idx].cur_row == rrd->rra_def[rra_idx].row_cnt) {
 #ifdef DEBUG
@@ -1717,21 +1724,20 @@
 #endif
                     *rra_current = rra_start;
                 }
-                if (*pcdp_summary != NULL) {
-                    rra_time = (current_time - current_time
-                                    % (rrd->rra_def[rra_idx].pdp_cnt *
-                                            rrd->stat_head->pdp_step))
-                            -
-                            ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
-                             rrd->stat_head->pdp_step);
+                if (!skip_update[rra_idx]) {
+                    if (*pcdp_summary != NULL) {
+                        rra_time = (current_time - current_time
+                                        % (rrd->rra_def[rra_idx].pdp_cnt *
+                                                rrd->stat_head->pdp_step))
+                                -
+                                ((rra_step_cnt[rra_idx] - 2) * rrd->rra_def[rra_idx].pdp_cnt *
+                                 rrd->stat_head->pdp_step);
+                    }
+                    if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
+                                            CDP_secondary_val, pcdp_summary, rra_time) == -1)
+                        return -1;
                 }
-                if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
-                                        scratch_idx, pcdp_summary, &rra_time) == -1)
-                    return -1;
             }
-
-            if (rrd_test_error())
-                return -1;
         }
         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
                 sizeof(rrd_value_t);
@@ -1752,7 +1758,7 @@
     unsigned long *rra_current,
     unsigned short CDP_scratch_idx,
     info_t **pcdp_summary,
-    time_t *rra_time)
+    time_t rra_time)
 {
     unsigned long ds_idx, cdp_idx;
     infoval   iv;
@@ -1769,7 +1775,7 @@
             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
             /* append info to the return hash */
             *pcdp_summary = info_push(*pcdp_summary,
-                     sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", *rra_time,
+                     sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]", rra_time,
                            rrd->rra_def[rra_idx].cf_nam,
                            rrd->rra_def[rra_idx].pdp_cnt,
                            rrd->ds_def[ds_idx].ds_nam), RD_I_VAL, iv);
_______________________________________________
rrd-developers mailing list
[email protected]
https://lists.oetiker.ch/cgi-bin/listinfo/rrd-developers

Reply via email to