Changeset: 92bd79411168 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/92bd79411168
Modified Files:
        gdk/gdk_logger.c
Branch: Jun2023
Log Message:

Make sure we don't write to a WAL file after an earlier write failed.
Also, a write failure is a reason to rotate the log files.


diffs (134 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -225,7 +225,9 @@ log_write_format(logger *lg, logformat *
 {
        assert(data->id || data->flag);
        assert(!lg->inmemory);
-       if (mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
+           mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
            mnstr_writeInt(lg->current->output_log, data->id))
                return GDK_SUCCEED;
        TRC_CRITICAL(GDK, "write failed\n");
@@ -271,7 +273,9 @@ log_write_id(logger *lg, int id)
 {
        assert(!lg->inmemory);
        assert(id >= 0);
-       if (mnstr_writeInt(lg->current->output_log, id))
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR &&
+           mnstr_writeInt(lg->current->output_log, id))
                return GDK_SUCCEED;
        TRC_CRITICAL(GDK, "write failed\n");
        return GDK_FAIL;
@@ -2373,7 +2377,12 @@ log_activate(logger *lg)
        bool flush_cleanup = false;
        gdk_return res = GDK_SUCCEED;
        rotation_lock(lg);
-       if (!lg->flushnow && !lg->current->next && lg->current->drops > 100000 
&& (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 && lg->saved_id+1 == lg->id && 
ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */) {
+       if (!lg->flushnow &&
+           !lg->current->next &&
+           lg->current->drops > 100000 &&
+           (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 &&
+           lg->saved_id + 1 == lg->id &&
+           ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this 
file */) {
                lg->id++;
                /* start new file */
                res = log_open_output(lg);
@@ -2521,7 +2530,9 @@ log_constant(logger *lg, int type, ptr v
 
        gdk_return (*wt) (const void *, stream *, size_t) = 
BATatoms[type].atomWrite;
 
-       if (log_write_format(lg, &l) != GDK_SUCCEED ||
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
+           log_write_format(lg, &l) != GDK_SUCCEED ||
            !mnstr_writeLng(lg->current->output_log, nr) ||
            mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
            !mnstr_writeLng(lg->current->output_log, offset)) {
@@ -2552,6 +2563,9 @@ string_writer(logger *lg, BAT *b, lng of
 
        if (!buf)
                return GDK_FAIL;
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
+               return GDK_FAIL;
        BATiter bi = bat_iterator(b);
        BUN p = (BUN)offset;
        for ( ; p < end; ) {
@@ -2579,7 +2593,9 @@ string_writer(logger *lg, BAT *b, lng of
                                sz += len;
                        }
                }
-               if (sz && (!mnstr_writeLng(lg->current->output_log, (lng) sz) 
|| mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
+               if (sz &&
+                   (!mnstr_writeLng(lg->current->output_log, (lng) sz) ||
+                    mnstr_write(lg->current->output_log, buf, sz, 1) != 1)) {
                        res = GDK_FAIL;
                        break;
                }
@@ -2609,11 +2625,17 @@ internal_log_bat(logger *lg, BAT *b, log
 
        gdk_return (*wt) (const void *, stream *, size_t) = 
BATatoms[b->ttype].atomWrite;
 
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR) {
+               ok = GDK_FAIL;
+               goto bailout;
+       }
+
        if (lg->total_cnt == 0) /* signals single bulk message or first part of 
bat logged in parts */
                if (log_write_format(lg, &l) != GDK_SUCCEED ||
-                       !mnstr_writeLng(lg->current->output_log, 
total_cnt?total_cnt:cnt) ||
-                       mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
-                       !mnstr_writeLng(lg->current->output_log, 
total_cnt?-1:offset)) { /* offset = -1 indicates bat was logged in parts */
+                   !mnstr_writeLng(lg->current->output_log, 
total_cnt?total_cnt:cnt) ||
+                   mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 ||
+                   !mnstr_writeLng(lg->current->output_log, 
total_cnt?-1:offset)) { /* offset = -1 indicates bat was logged in parts */
                        ok = GDK_FAIL;
                        goto bailout;
                }
@@ -2696,7 +2718,9 @@ log_bat_persists(logger *lg, BAT *b, log
        l.flag = LOG_CREATE;
        l.id = id;
        if (!LOG_DISABLED(lg)) {
-               if (log_write_format(lg, &l) != GDK_SUCCEED ||
+               assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+               if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
+                   log_write_format(lg, &l) != GDK_SUCCEED ||
                    mnstr_write(lg->current->output_log, &ta, 1, 1) != 1) {
                        log_unlock(lg);
                        ATOMIC_DEC(&lg->current->refcount);
@@ -2819,7 +2843,9 @@ log_delta(logger *lg, BAT *uid, BAT *uva
        gdk_return (*wh) (const void *, stream *, size_t) = 
BATatoms[TYPE_oid].atomWrite;
        gdk_return (*wt) (const void *, stream *, size_t) = 
BATatoms[uval->ttype].atomWrite;
 
-       if (log_write_format(lg, &l) != GDK_SUCCEED ||
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
+           log_write_format(lg, &l) != GDK_SUCCEED ||
            !mnstr_writeLng(lg->current->output_log, nr) ||
             mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1){
                ok = GDK_FAIL;
@@ -2874,6 +2900,8 @@ check_rotation_conditions(logger *lg) {
 
        if (lg->current->next)
                return false; /* do not rotate if there is already a prepared 
next current */
+       if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR)
+               return true;
        const lng p = (lng) getfilepos(getFile(lg->current->output_log));
 
        const lng log_large = (ATOMIC_GET(&GDKdebug) & 
FORCEMITOMASK)?LOG_MINI:LOG_LARGE;
@@ -3004,7 +3032,9 @@ log_tsequence_(logger *lg, int seq, lng 
 
        TRC_DEBUG(WAL, "tsequence(%d," LLFMT ")\n", seq, val);
 
-       if (log_write_format(lg, &l) != GDK_SUCCEED ||
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
+           log_write_format(lg, &l) != GDK_SUCCEED ||
            !mnstr_writeLng(lg->current->output_log, val)) {
                TRC_CRITICAL(GDK, "write failed\n");
                ATOMIC_DEC(&lg->current->refcount);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to