Changeset: fb5c89b34e10 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/fb5c89b34e10
Modified Files:
        gdk/gdk_logger.c
Branch: Jun2023
Log Message:

Try not to subcommit every bat in the catalog everytime.


diffs (truncated from 319 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -94,7 +94,7 @@ typedef struct logformat_t {
 
 typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
 
-static gdk_return bm_commit(logger *lg);
+static gdk_return bm_commit(logger *lg, uint32_t *updated, BUN maxupdated);
 static gdk_return tr_grow(trans *tr);
 
 #define log_lock(lg)   MT_lock_set(&(lg)->lock)
@@ -1149,7 +1149,7 @@ log_open_input(logger *lg, const char *f
 }
 
 static log_return
-log_read_transaction(logger *lg)
+log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated)
 {
        logformat l;
        trans *tr = NULL;
@@ -1176,6 +1176,30 @@ log_read_transaction(logger *lg)
                        else
                                TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id);
                }
+               switch (l.flag) {
+               case LOG_UPDATE_CONST:
+               case LOG_UPDATE_BULK:
+               case LOG_UPDATE:
+               case LOG_CREATE:
+               case LOG_DESTROY:
+                       if (updated &&
+                           BAThash(lg->catalog_id) == GDK_SUCCEED) {
+                               BATiter cni = bat_iterator(lg->catalog_id);
+                               BUN p;
+                               MT_rwlock_rdlock(&cni.b->thashlock);
+                               HASHloop_int(cni, cni.b->thash, p, &l.id) {
+                                       assert(p < maxupdated);
+                                       updated[p / 32] |= 1U << (p % 32);
+                                       /* there should only be one hit */
+                                       break;
+                               }
+                               MT_rwlock_rdunlock(&cni.b->thashlock);
+                               bat_iterator_end(&cni);
+                       }
+                       break;
+               default:
+                       /* do nothing */
+               }
                /* the functions we call here can succeed (LOG_OK),
                 * but they can also fail for two different reasons:
                 * they can run out of input (LOG_EOF -- this is not
@@ -1307,7 +1331,7 @@ log_readlog(logger *lg, const char *file
                                }
                        }
                }
-               err = log_read_transaction(lg);
+               err = log_read_transaction(lg, NULL, 0);
        }
        log_close_input(lg);
        lg->input_log = NULL;
@@ -1356,11 +1380,11 @@ log_readlogs(logger *lg, const char *fil
 }
 
 static gdk_return
-log_commit(logger *lg)
+log_commit(logger *lg, uint32_t *updated, BUN maxupdated)
 {
        TRC_DEBUG(WAL, "commit");
 
-       return bm_commit(lg);
+       return bm_commit(lg, updated, maxupdated);
 }
 
 static gdk_return
@@ -1509,7 +1533,7 @@ subcommit_list_add(int next, bat *n, BUN
 }
 
 static int
-cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng 
*cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup)
+cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng 
*cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup, uint32_t 
*updated, BUN maxupdated)
 {
        BAT *nbids, *noids, *ncnts, *nlids, *ndels;
        BUN p, q;
@@ -1519,6 +1543,9 @@ cleanup_and_swap(logger *lg, int *r, con
        BATloop(dcatalog, p, q) {
                oid pos = poss[p];
 
+               if (updated && pos < maxupdated && (updated[pos / 32] & (1U << 
(pos % 32))) == 0) {
+                       continue;
+               }
                if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
                        continue;
 
@@ -1562,7 +1589,7 @@ cleanup_and_swap(logger *lg, int *r, con
 
                /* only project out the deleted with lid == -1
                 * update dcatalog */
-               if (lid == -1)
+               if ((updated == NULL || p >= maxupdated || (updated[p / 32] & 
(1U << (p % 32))) != 0) && lid == -1)
                        continue; /* remove */
 
                if (BUNappend(nbids, &col, false) != GDK_SUCCEED ||
@@ -1570,9 +1597,11 @@ cleanup_and_swap(logger *lg, int *r, con
                    BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
                    BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
                        err=1;
-               pos = (oid)(BATcount(nbids)-1);
-               if (lid != lng_nil && BUNappend(ndels, &pos, false) != 
GDK_SUCCEED)
-                       err=1;
+               if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) {
+                       pos = (oid)(BATcount(nbids)-1);
+                       if (BUNappend(ndels, &pos, false) != GDK_SUCCEED)
+                               err = 1;
+               }
        }
 
        if (err) {
@@ -1601,6 +1630,8 @@ cleanup_and_swap(logger *lg, int *r, con
        r[rcnt++] = lg->catalog_id->batCacheid;
        r[rcnt++] = lg->dcatalog->batCacheid;
 
+       assert(lg->deleted - cleanup == BATcount(ndels));
+
        logbat_destroy(lg->catalog_bid);
        logbat_destroy(lg->catalog_id);
        logbat_destroy(lg->dcatalog);
@@ -1616,14 +1647,13 @@ cleanup_and_swap(logger *lg, int *r, con
        lg->catalog_lid = nlids;
        lg->cnt = BATcount(lg->catalog_bid);
        lg->deleted -= cleanup;
-       assert(lg->deleted == BATcount(lg->dcatalog));
        return rcnt;
 }
 
 /* this function is called with log_lock() held; it releases the lock
  * before returning */
 static gdk_return
-bm_subcommit(logger *lg)
+bm_subcommit(logger *lg, uint32_t *updated, BUN maxupdated)
 {
        BUN p, q;
        BAT *catalog_bid = lg->catalog_bid;
@@ -1656,6 +1686,9 @@ bm_subcommit(logger *lg)
        if (lg->catalog_lid)
                lids = (lng *) Tloc(lg->catalog_lid, 0);
        BATloop(catalog_bid, p, q) {
+               if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 
32))) == 0) {
+                       continue;
+               }
                bat col = bids[p];
 
                if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid)
@@ -1673,7 +1706,7 @@ bm_subcommit(logger *lg)
        sizes[i] = BATcount(dcatalog);
        n[i++] = dcatalog->batCacheid;
 
-       if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, 
catalog_bid, catalog_id, dcatalog, cleanup)) < 0) {
+       if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, 
catalog_bid, catalog_id, dcatalog, cleanup, updated, maxupdated)) < 0) {
                GDKfree(n);
                GDKfree(r);
                GDKfree(sizes);
@@ -1941,7 +1974,7 @@ log_load(const char *fn, const char *log
 
                log_lock(lg);
                /* bm_subcommit releases the lock */
-               if (bm_subcommit(lg) != GDK_SUCCEED) {
+               if (bm_subcommit(lg, NULL, 0) != GDK_SUCCEED) {
                        /* cannot commit catalog, so remove log */
                        MT_remove(filename);
                        BBPrelease(lg->catalog_bid->batCacheid);
@@ -2076,7 +2109,7 @@ log_load(const char *fn, const char *log
        }
        dbg = ATOMIC_GET(&GDKdebug);
        ATOMIC_AND(&GDKdebug, ~CHECKMASK);
-       if (needcommit && bm_commit(lg) != GDK_SUCCEED) {
+       if (needcommit && bm_commit(lg, NULL, 0) != GDK_SUCCEED) {
                GDKerror("Logger_new: commit failed");
                goto error;
        }
@@ -2101,7 +2134,7 @@ log_load(const char *fn, const char *log
                }
                dbg = ATOMIC_GET(&GDKdebug);
                ATOMIC_AND(&GDKdebug, ~CHECKMASK);
-               if (log_commit(lg) != GDK_SUCCEED) {
+               if (log_commit(lg, NULL, 0) != GDK_SUCCEED) {
                        goto error;
                }
                ATOMIC_SET(&GDKdebug, dbg);
@@ -2261,7 +2294,7 @@ log_destroy(logger *lg)
        if (LOG_DISABLED(lg)) {
                lg->saved_id = lg->id;
                lg->saved_tid = lg->tid;
-               log_commit(lg);
+               log_commit(lg, NULL, 0);
        }
        if (lg->catalog_bid) {
                log_lock(lg);
@@ -2406,7 +2439,7 @@ log_flush(logger *lg, ulng ts)
                lg->saved_tid = lg->tid;
                if (lid)
                        log_cleanup_range(lg, lg->saved_id);
-               if (log_commit(lg) != GDK_SUCCEED)
+               if (log_commit(lg, NULL, 0) != GDK_SUCCEED)
                        TRC_ERROR(GDK, "failed to commit");
                return GDK_SUCCEED;
        }
@@ -2420,17 +2453,24 @@ log_flush(logger *lg, ulng ts)
        log_return res = LOG_OK;
        ulng cid = olid;
        assert (lid <= lgid);
+       uint32_t *updated = NULL;
+       BUN nupdated = 0;
+       size_t allocated = 0;
        while(cid < lid && res == LOG_OK) {
                if (!lg->input_log) {
                        char *filename;
                        char id[32];
                        if (snprintf(id, sizeof(id), LLFMT, cid+1) >= (int) 
sizeof(id)) {
+                               GDKfree(updated);
                                TRC_CRITICAL(GDK, "log_id filename is too 
large\n");
                                return GDK_FAIL;
                        }
-                       if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 
0, offheap), lg->dir, LOGFILE, id)) == NULL)
+                       if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 
0, offheap), lg->dir, LOGFILE, id)) == NULL) {
+                               GDKfree(updated);
                                return GDK_FAIL;
+                       }
                        if (strlen(filename) >= FILENAME_MAX) {
+                               GDKfree(updated);
                                GDKerror("Logger filename path is too large\n");
                                GDKfree(filename);
                                return GDK_FAIL;
@@ -2438,6 +2478,7 @@ log_flush(logger *lg, ulng ts)
 
                        bool filemissing = false;
                        if (log_open_input(lg, filename, &filemissing) != 
GDK_SUCCEED) {
+                               GDKfree(updated);
                                GDKfree(filename);
                                return GDK_FAIL;
                        }
@@ -2445,8 +2486,32 @@ log_flush(logger *lg, ulng ts)
                }
                /* we read the full file because skipping is impossible with 
current log format */
                log_lock(lg);
+               if (updated == NULL) {
+                       nupdated = BATcount(lg->catalog_id);
+                       allocated = ((nupdated + 31) & ~31) / 8;
+                       updated = GDKzalloc(allocated);
+                       if (updated == NULL) {
+                               log_unlock(lg);
+                               return GDK_FAIL;
+                       }
+               } else if (nupdated < BATcount(lg->catalog_id)) {
+                       BUN n = BATcount(lg->catalog_id);
+                       size_t a = ((n + 31) & ~31) / 8;
+                       if (a > allocated) {
+                               uint32_t *p = GDKrealloc(updated, a);
+                               if (p == NULL) {
+                                       GDKfree(updated);
+                                       log_unlock(lg);
+                                       return GDK_FAIL;
+                               }
+                               memset(updated + allocated / 4, 0, a - 
allocated);
+                               allocated = a;
+                               updated = p;
+                       }
+                       nupdated = n;
+               }
                lg->flushing = true;
-               res = log_read_transaction(lg);
+               res = log_read_transaction(lg, updated, nupdated);
                lg->flushing = false;
                log_unlock(lg);
                if (res == LOG_EOF) {
@@ -2459,7 +2524,7 @@ log_flush(logger *lg, ulng ts)
                rotation_lock(lg); /* protect against concurrent log_tflush 
rotate check */
                lg->saved_id = lid;
                rotation_unlock(lg);
-               if (log_commit(lg) != GDK_SUCCEED) {
+               if (log_commit(lg, updated, nupdated) != GDK_SUCCEED) {
                        TRC_ERROR(GDK, "failed to commit");
                        res = LOG_ERR;
                        rotation_lock(lg);
@@ -2476,6 +2541,7 @@ log_flush(logger *lg, ulng ts)
                if (res == LOG_OK)
                        log_cleanup_range(lg, lg->saved_id);
        }
+       GDKfree(updated);
        return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
 }
 
@@ -2982,7 +3048,7 @@ log_tflush(logger* lg, ulng file_id, uln
                do_rotate(lg);
                (void) do_flush_range_cleanup(lg);
                assert(lg->flush_ranges == lg->current);
-               return log_commit(lg);
+               return log_commit(lg, NULL, 0);
        }
 
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to