Changeset: fb5c89b34e10 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/fb5c89b34e10 Modified Files: gdk/gdk_logger.c Branch: Jun2023 Log Message:
Try not to subcommit every bat in the catalog everytime. diffs (truncated from 319 to 300 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -94,7 +94,7 @@ typedef struct logformat_t { typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return; -static gdk_return bm_commit(logger *lg); +static gdk_return bm_commit(logger *lg, uint32_t *updated, BUN maxupdated); static gdk_return tr_grow(trans *tr); #define log_lock(lg) MT_lock_set(&(lg)->lock) @@ -1149,7 +1149,7 @@ log_open_input(logger *lg, const char *f } static log_return -log_read_transaction(logger *lg) +log_read_transaction(logger *lg, uint32_t *updated, BUN maxupdated) { logformat l; trans *tr = NULL; @@ -1176,6 +1176,30 @@ log_read_transaction(logger *lg) else TRC_DEBUG_ENDIF(WAL, "%d %d", l.flag, l.id); } + switch (l.flag) { + case LOG_UPDATE_CONST: + case LOG_UPDATE_BULK: + case LOG_UPDATE: + case LOG_CREATE: + case LOG_DESTROY: + if (updated && + BAThash(lg->catalog_id) == GDK_SUCCEED) { + BATiter cni = bat_iterator(lg->catalog_id); + BUN p; + MT_rwlock_rdlock(&cni.b->thashlock); + HASHloop_int(cni, cni.b->thash, p, &l.id) { + assert(p < maxupdated); + updated[p / 32] |= 1U << (p % 32); + /* there should only be one hit */ + break; + } + MT_rwlock_rdunlock(&cni.b->thashlock); + bat_iterator_end(&cni); + } + break; + default: + /* do nothing */ + } /* the functions we call here can succeed (LOG_OK), * but they can also fail for two different reasons: * they can run out of input (LOG_EOF -- this is not @@ -1307,7 +1331,7 @@ log_readlog(logger *lg, const char *file } } } - err = log_read_transaction(lg); + err = log_read_transaction(lg, NULL, 0); } log_close_input(lg); lg->input_log = NULL; @@ -1356,11 +1380,11 @@ log_readlogs(logger *lg, const char *fil } static gdk_return -log_commit(logger *lg) +log_commit(logger *lg, uint32_t *updated, BUN maxupdated) { TRC_DEBUG(WAL, "commit"); - return bm_commit(lg); + return bm_commit(lg, updated, maxupdated); } static gdk_return @@ -1509,7 +1533,7 @@ subcommit_list_add(int next, bat *n, BUN } static int -cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup) +cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng *cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, BUN cleanup, uint32_t *updated, BUN maxupdated) { BAT *nbids, *noids, *ncnts, *nlids, *ndels; BUN p, q; @@ -1519,6 +1543,9 @@ cleanup_and_swap(logger *lg, int *r, con BATloop(dcatalog, p, q) { oid pos = poss[p]; + if (updated && pos < maxupdated && (updated[pos / 32] & (1U << (pos % 32))) == 0) { + continue; + } if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid) continue; @@ -1562,7 +1589,7 @@ cleanup_and_swap(logger *lg, int *r, con /* only project out the deleted with lid == -1 * update dcatalog */ - if (lid == -1) + if ((updated == NULL || p >= maxupdated || (updated[p / 32] & (1U << (p % 32))) != 0) && lid == -1) continue; /* remove */ if (BUNappend(nbids, &col, false) != GDK_SUCCEED || @@ -1570,9 +1597,11 @@ cleanup_and_swap(logger *lg, int *r, con BUNappend(nlids, &lid, false) != GDK_SUCCEED || BUNappend(ncnts, &cnt, false) != GDK_SUCCEED) err=1; - pos = (oid)(BATcount(nbids)-1); - if (lid != lng_nil && BUNappend(ndels, &pos, false) != GDK_SUCCEED) - err=1; + if (BUNfnd(lg->dcatalog, &pos) != BUN_NONE) { + pos = (oid)(BATcount(nbids)-1); + if (BUNappend(ndels, &pos, false) != GDK_SUCCEED) + err = 1; + } } if (err) { @@ -1601,6 +1630,8 @@ cleanup_and_swap(logger *lg, int *r, con r[rcnt++] = lg->catalog_id->batCacheid; r[rcnt++] = lg->dcatalog->batCacheid; + assert(lg->deleted - cleanup == BATcount(ndels)); + logbat_destroy(lg->catalog_bid); logbat_destroy(lg->catalog_id); logbat_destroy(lg->dcatalog); @@ -1616,14 +1647,13 @@ cleanup_and_swap(logger *lg, int *r, con lg->catalog_lid = nlids; lg->cnt = BATcount(lg->catalog_bid); lg->deleted -= cleanup; - assert(lg->deleted == BATcount(lg->dcatalog)); return rcnt; } /* this function is called with log_lock() held; it releases the lock * before returning */ static gdk_return -bm_subcommit(logger *lg) +bm_subcommit(logger *lg, uint32_t *updated, BUN maxupdated) { BUN p, q; BAT *catalog_bid = lg->catalog_bid; @@ -1656,6 +1686,9 @@ bm_subcommit(logger *lg) if (lg->catalog_lid) lids = (lng *) Tloc(lg->catalog_lid, 0); BATloop(catalog_bid, p, q) { + if (updated && p < maxupdated && (updated[p / 32] & (1U << (p % 32))) == 0) { + continue; + } bat col = bids[p]; if (lids && lids[p] != lng_nil && lids[p] <= lg->saved_tid) @@ -1673,7 +1706,7 @@ bm_subcommit(logger *lg) sizes[i] = BATcount(dcatalog); n[i++] = dcatalog->batCacheid; - if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, catalog_id, dcatalog, cleanup)) < 0) { + if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, catalog_bid, catalog_id, dcatalog, cleanup, updated, maxupdated)) < 0) { GDKfree(n); GDKfree(r); GDKfree(sizes); @@ -1941,7 +1974,7 @@ log_load(const char *fn, const char *log log_lock(lg); /* bm_subcommit releases the lock */ - if (bm_subcommit(lg) != GDK_SUCCEED) { + if (bm_subcommit(lg, NULL, 0) != GDK_SUCCEED) { /* cannot commit catalog, so remove log */ MT_remove(filename); BBPrelease(lg->catalog_bid->batCacheid); @@ -2076,7 +2109,7 @@ log_load(const char *fn, const char *log } dbg = ATOMIC_GET(&GDKdebug); ATOMIC_AND(&GDKdebug, ~CHECKMASK); - if (needcommit && bm_commit(lg) != GDK_SUCCEED) { + if (needcommit && bm_commit(lg, NULL, 0) != GDK_SUCCEED) { GDKerror("Logger_new: commit failed"); goto error; } @@ -2101,7 +2134,7 @@ log_load(const char *fn, const char *log } dbg = ATOMIC_GET(&GDKdebug); ATOMIC_AND(&GDKdebug, ~CHECKMASK); - if (log_commit(lg) != GDK_SUCCEED) { + if (log_commit(lg, NULL, 0) != GDK_SUCCEED) { goto error; } ATOMIC_SET(&GDKdebug, dbg); @@ -2261,7 +2294,7 @@ log_destroy(logger *lg) if (LOG_DISABLED(lg)) { lg->saved_id = lg->id; lg->saved_tid = lg->tid; - log_commit(lg); + log_commit(lg, NULL, 0); } if (lg->catalog_bid) { log_lock(lg); @@ -2406,7 +2439,7 @@ log_flush(logger *lg, ulng ts) lg->saved_tid = lg->tid; if (lid) log_cleanup_range(lg, lg->saved_id); - if (log_commit(lg) != GDK_SUCCEED) + if (log_commit(lg, NULL, 0) != GDK_SUCCEED) TRC_ERROR(GDK, "failed to commit"); return GDK_SUCCEED; } @@ -2420,17 +2453,24 @@ log_flush(logger *lg, ulng ts) log_return res = LOG_OK; ulng cid = olid; assert (lid <= lgid); + uint32_t *updated = NULL; + BUN nupdated = 0; + size_t allocated = 0; while(cid < lid && res == LOG_OK) { if (!lg->input_log) { char *filename; char id[32]; if (snprintf(id, sizeof(id), LLFMT, cid+1) >= (int) sizeof(id)) { + GDKfree(updated); TRC_CRITICAL(GDK, "log_id filename is too large\n"); return GDK_FAIL; } - if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) + if ((filename = GDKfilepath(BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id)) == NULL) { + GDKfree(updated); return GDK_FAIL; + } if (strlen(filename) >= FILENAME_MAX) { + GDKfree(updated); GDKerror("Logger filename path is too large\n"); GDKfree(filename); return GDK_FAIL; @@ -2438,6 +2478,7 @@ log_flush(logger *lg, ulng ts) bool filemissing = false; if (log_open_input(lg, filename, &filemissing) != GDK_SUCCEED) { + GDKfree(updated); GDKfree(filename); return GDK_FAIL; } @@ -2445,8 +2486,32 @@ log_flush(logger *lg, ulng ts) } /* we read the full file because skipping is impossible with current log format */ log_lock(lg); + if (updated == NULL) { + nupdated = BATcount(lg->catalog_id); + allocated = ((nupdated + 31) & ~31) / 8; + updated = GDKzalloc(allocated); + if (updated == NULL) { + log_unlock(lg); + return GDK_FAIL; + } + } else if (nupdated < BATcount(lg->catalog_id)) { + BUN n = BATcount(lg->catalog_id); + size_t a = ((n + 31) & ~31) / 8; + if (a > allocated) { + uint32_t *p = GDKrealloc(updated, a); + if (p == NULL) { + GDKfree(updated); + log_unlock(lg); + return GDK_FAIL; + } + memset(updated + allocated / 4, 0, a - allocated); + allocated = a; + updated = p; + } + nupdated = n; + } lg->flushing = true; - res = log_read_transaction(lg); + res = log_read_transaction(lg, updated, nupdated); lg->flushing = false; log_unlock(lg); if (res == LOG_EOF) { @@ -2459,7 +2524,7 @@ log_flush(logger *lg, ulng ts) rotation_lock(lg); /* protect against concurrent log_tflush rotate check */ lg->saved_id = lid; rotation_unlock(lg); - if (log_commit(lg) != GDK_SUCCEED) { + if (log_commit(lg, updated, nupdated) != GDK_SUCCEED) { TRC_ERROR(GDK, "failed to commit"); res = LOG_ERR; rotation_lock(lg); @@ -2476,6 +2541,7 @@ log_flush(logger *lg, ulng ts) if (res == LOG_OK) log_cleanup_range(lg, lg->saved_id); } + GDKfree(updated); return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED; } @@ -2982,7 +3048,7 @@ log_tflush(logger* lg, ulng file_id, uln do_rotate(lg); (void) do_flush_range_cleanup(lg); assert(lg->flush_ranges == lg->current); - return log_commit(lg); + return log_commit(lg, NULL, 0); } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org