On Mon, Jul 29, 2019 at 12:58:17PM -0400, Tom Lane wrote:
> > On Sun, Feb 17, 2019 at 11:31:03PM -0800, Noah Misch wrote:
> >>> b. Arrange so only one backend runs vac_truncate_clog() at a time.  Other 
> >>> than
> >>> AsyncCtl, every SLRU truncation appears in vac_truncate_clog(), in a
> >>> checkpoint, or in the startup process.  Hence, also arrange for only one
> >>> backend to call SimpleLruTruncate(AsyncCtl) at a time.
> 
> >> More specifically, restrict vac_update_datfrozenxid() to one backend per
> >> database, and restrict vac_truncate_clog() and asyncQueueAdvanceTail() to 
> >> one
> >> backend per cluster.  This, attached, was rather straightforward.

> I'm pretty sure it was intentional that multiple backends
> could run truncation directory scans concurrently, and I don't really
> want to give that up if possible.

I want to avoid a design that requires painstaking concurrency analysis.  Such
analysis is worth it for heap_update(), but vac_truncate_clog() isn't enough
of a hot path.  If there's a way to make vac_truncate_clog() easy to analyze
and still somewhat concurrent, fair.

> Also, if I understand the data-loss hazard properly, it's what you
> said in the other thread: the latest_page_number could advance after
> we make our decision about what to truncate, and then maybe we could
> truncate newly-added data.  We surely don't want to lock out the
> operations that can advance last_page_number, so how does serializing
> vac_truncate_clog help?
> 
> I wonder whether what we need to do is add some state in shared
> memory saying "it is only safe to create pages before here", and
> make SimpleLruZeroPage or its callers check that.  The truncation
> logic would advance that value, but only after completing a scan.
> (Oh ..., hmm, maybe the point of serializing truncations is to
> ensure that we know that we can safely advance that?) 

vac_truncate_clog() already records "it is only safe to create pages before
here" in ShmemVariableCache->xidWrapLimit, which it updates after any unlinks.
The trouble comes when two vac_truncate_clog() run in parallel and you get a
sequence of events like this:

vac_truncate_clog() instance 1 starts, considers segment ABCD eligible to unlink
vac_truncate_clog() instance 2 starts, considers segment ABCD eligible to unlink
vac_truncate_clog() instance 1 unlinks segment ABCD
vac_truncate_clog() instance 1 calls SetTransactionIdLimit()
vac_truncate_clog() instance 1 finishes
some backend calls SimpleLruZeroPage(), creating segment ABCD
vac_truncate_clog() instance 2 unlinks segment ABCD

Serializing vac_truncate_clog() fixes that.

> Can you post whatever script you used to detect/reproduce the problem
> in the first place?

I've attached it, but it's pretty hackish.  Apply the patch on commit 7170268,
make sure your --bindir is in $PATH, copy "conf-test-pg" to your home
directory, and run "make trunc-check".  This incorporates xid-burn
acceleration code that Jeff Janes shared with -hackers at some point.

> PS: also, re-reading this code, it's worrisome that we are not checking
> for failure of the unlink calls.  I think the idea was that it didn't
> really matter because if we did re-use an existing file we'd just
> re-zero it; hence failing the truncate is an overreaction.  But probably
> some comments about that are in order.

That's my understanding.  We'll zero any page before reusing it.  Failing the
whole vac_truncate_clog() (and therefore not advancing the wrap limit) would
do more harm than the bit of wasted disk space.  Still, a LOG or WARNING
message would be fine, I think.

Thanks,
nm
diff --git a/GNUmakefile.in b/GNUmakefile.in
index f4e31a7..284fba1 100644
--- a/GNUmakefile.in
+++ b/GNUmakefile.in
@@ -67,6 +67,20 @@ check check-tests installcheck installcheck-parallel 
installcheck-tests: CHECKPR
 check check-tests installcheck installcheck-parallel installcheck-tests: 
submake-generated-headers
        $(MAKE) -C src/test/regress $@
 
+TEST_PGDATA_DATA=/var/tmp/test-pg
+trunc_check:
+       make -j20 install PROFILE=-O0
+       $(CC) -g -Wall -I`pg_config --includedir` -L`pg_config --libdir` 
-Wl,-R`pg_config --libdir` trunc-clog-concurrency.c -lpq -o 
trunc-clog-concurrency-tester
+       pg_ctl -m fast -w stop ||:
+       rm -rf $(TEST_PGDATA_DATA)
+       initdb -N $(TEST_PGDATA_DATA)
+       cp -p ~/conf-test-pg $(TEST_PGDATA_DATA)/postgresql.conf
+       echo "default_transaction_isolation = 'read committed'" 
>>$(TEST_PGDATA_DATA)/postgresql.conf
+       pg_ctl -w start
+       createdb
+       psql -Xc 'alter database template0 allow_connections on'
+       env time ./trunc-clog-concurrency-tester
+
 $(call recurse,check-world,src/test src/pl src/interfaces/ecpg contrib 
src/bin,check)
 $(call recurse,checkprep,  src/test src/pl src/interfaces/ecpg contrib src/bin)
 
diff --git a/src/backend/access/transam/clog.c 
b/src/backend/access/transam/clog.c
index aa089d8..7202f34 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -42,6 +42,7 @@
 #include "pgstat.h"
 #include "pg_trace.h"
 #include "storage/proc.h"
+#include "utils/fmgrprotos.h"
 
 /*
  * Defines for CLOG page sizes.  A page is the same BLCKSZ as is used
@@ -917,6 +918,22 @@ TruncateCLOG(TransactionId oldestXact, Oid 
oldestxid_datoid)
        if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, 
&cutoffPage))
                return;                                 /* nothing to remove */
 
+#if 0
+       /* FIXME Move sleep duration into a GUC? */
+       if (LWLockConditionalAcquire(TruncSleepLock, LW_EXCLUSIVE))
+       {
+               elog(LOG, "TruncSleepLock taken: sleeping (%d for %u)",
+                        cutoffPage, oldestXact);
+               DirectFunctionCall1(pg_sleep, Float8GetDatum(10.0));
+               /* TODO increase time, attach debugger and check caller vars */
+               LWLockRelease(TruncSleepLock);
+               elog(LOG, "TruncSleepLock done: proceeding");
+       }
+       else
+               elog(LOG, "TruncSleepLock unavailable: proceeding (%d for %u)",
+                        cutoffPage, oldestXact);
+#endif
+
        /*
         * Advance oldestClogXid before truncating clog, so concurrent xact 
status
         * lookups can ensure they don't attempt to access truncated-away clog.
diff --git a/src/backend/access/transam/slru.c 
b/src/backend/access/transam/slru.c
index 3623352..ab28eda 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -57,6 +57,7 @@
 #include "pgstat.h"
 #include "storage/fd.h"
 #include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
 #include "miscadmin.h"
 
 
@@ -1171,11 +1172,6 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
        int                     slotno;
 
        /*
-        * The cutoff point is the start of the segment containing cutoffPage.
-        */
-       cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
-
-       /*
         * Scan shared memory and remove any pages preceding the cutoff page, to
         * ensure we won't rewrite them later.  (Since this is normally called 
in
         * or just after a checkpoint, any dirty pages should have been flushed
@@ -1191,6 +1187,21 @@ restart:;
         * have already wrapped around, and proceeding with the truncation would
         * risk removing the current segment.
         */
+       if (shared->ControlLock == CLogControlLock)
+       {
+               int test = shared->latest_page_number;
+               elog(WARNING, "important safety check: %d latest < %d cutoff?",
+                        shared->latest_page_number, cutoffPage);
+               while (test < 130000)
+               {
+                       if (ctl->PagePrecedes(test, cutoffPage))
+                       {
+                               elog(WARNING, "safety check would trip at %d", 
test);
+                               break;
+                       }
+                       test++;
+               }
+       }
        if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
        {
                LWLockRelease(shared->ControlLock);
@@ -1233,6 +1244,29 @@ restart:;
 
        LWLockRelease(shared->ControlLock);
 
+#if 1
+       if (shared->ControlLock == CLogControlLock)
+       {
+               /* FIXME Move sleep duration into a GUC? */
+               if (LWLockConditionalAcquire(TruncSleepLock, LW_EXCLUSIVE))
+               {
+                       elog(LOG, "TruncSleepLock taken: sleeping (%d)",
+                                cutoffPage);
+                       DirectFunctionCall1(pg_sleep, Float8GetDatum(10.0));
+                       /* TODO increase time, attach debugger and check caller 
vars */
+                       LWLockRelease(TruncSleepLock);
+                       elog(LOG, "TruncSleepLock done: proceeding");
+               }
+               else
+                       elog(LOG, "TruncSleepLock unavailable: proceeding (%d)",
+                                cutoffPage);
+       }
+#endif
+
+       if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
+               ereport(LOG,
+                               (errmsg("too late, but apparent wraparound")));
+
        /* Now we can remove the old segment(s) */
        (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
 }
@@ -1320,11 +1354,10 @@ restart:
 bool
 SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void 
*data)
 {
+       int                     seg_last_page = segpage + 
SLRU_PAGES_PER_SEGMENT - 1;
        int                     cutoffPage = *(int *) data;
 
-       cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
-
-       if (ctl->PagePrecedes(segpage, cutoffPage))
+       if (ctl->PagePrecedes(seg_last_page, cutoffPage))
                return true;                    /* found one; don't iterate any 
more */
 
        return false;                           /* keep going */
@@ -1337,9 +1370,10 @@ SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, 
int segpage, void *data
 static bool
 SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data)
 {
+       int                     seg_last_page = segpage + 
SLRU_PAGES_PER_SEGMENT - 1;
        int                     cutoffPage = *(int *) data;
 
-       if (ctl->PagePrecedes(segpage, cutoffPage))
+       if (ctl->PagePrecedes(seg_last_page, cutoffPage))
                SlruInternalDeleteSegment(ctl, filename);
 
        return false;                           /* keep going */
diff --git a/src/backend/access/transam/varsup.c 
b/src/backend/access/transam/varsup.c
index fe94fda..37db959 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -33,6 +33,8 @@
 /* pointer to "variable cache" in shared memory (set up by shmem.c) */
 VariableCache ShmemVariableCache = NULL;
 
+int JJ_xid=0;
+
 
 /*
  * Allocate the next XID for a new transaction or subtransaction.
@@ -168,6 +170,11 @@ GetNewTransactionId(bool isSubXact)
         *
         * Extend pg_subtrans and pg_commit_ts too.
         */
+       {
+       int             incr;
+       for (incr=0; incr <=JJ_xid; incr++)
+       {
+       xid = ShmemVariableCache->nextXid;
        ExtendCLOG(xid);
        ExtendCommitTs(xid);
        ExtendSUBTRANS(xid);
@@ -180,6 +187,13 @@ GetNewTransactionId(bool isSubXact)
         */
        TransactionIdAdvance(ShmemVariableCache->nextXid);
 
+       /* If JJ_xid opposes xidStopLimit, the latter wins */
+       if (TransactionIdFollowsOrEquals(ShmemVariableCache->nextXid,
+                                                                        
ShmemVariableCache->xidStopLimit))
+               break;
+       }
+       }
+
        /*
         * We must store the new XID into the shared ProcArray before releasing
         * XidGenLock.  This ensures that every active XID older than
@@ -302,9 +316,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, 
Oid oldest_datoid)
         * We'll refuse to continue assigning XIDs in interactive mode once we 
get
         * within 1M transactions of data loss.  This leaves lots of room for 
the
         * DBA to fool around fixing things in a standalone backend, while not
-        * being significant compared to total XID space. (Note that since
-        * vacuuming requires one transaction per table cleaned, we had better 
be
-        * sure there's lots of XIDs left...)
+        * being significant compared to total XID space.
         */
        xidStopLimit = xidWrapLimit - 1000000;
        if (xidStopLimit < FirstNormalTransactionId)
diff --git a/src/backend/postmaster/postmaster.c 
b/src/backend/postmaster/postmaster.c
index a707d4d..fa7ab6a 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5143,7 +5143,7 @@ sigusr1_handler(SIGNAL_ARGS)
                 * that by launching another iteration as soon as the current 
one
                 * completes.
                 */
-               start_autovac_launcher = true;
+               /* start_autovac_launcher = true; */
        }
 
        if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER) &&
diff --git a/src/backend/storage/lmgr/lwlocknames.txt 
b/src/backend/storage/lmgr/lwlocknames.txt
index db47843..335521c 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -49,3 +49,4 @@ MultiXactTruncationLock                               41
 OldSnapshotTimeMapLock                         42
 LogicalRepWorkerLock                           43
 CLogTruncationLock                                     44
+TruncSleepLock                                         45
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f81e042..4e59a68 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -116,6 +116,7 @@
 /* XXX these should appear in other modules' header files */
 extern bool Log_disconnections;
 extern int     CommitDelay;
+extern int     JJ_xid;
 extern int     CommitSiblings;
 extern char *default_tablespace;
 extern char *temp_tablespaces;
@@ -2617,6 +2618,15 @@ static struct config_int ConfigureNamesInt[] =
        },
 
        {
+               {"JJ_xid", PGC_USERSET, WAL_SETTINGS,
+                       gettext_noop("Skip this many xid every time we acquire 
one"),
+                       NULL
+               },
+               &JJ_xid,
+               0, 0, 1000000, NULL, NULL
+       },
+
+       {
                {"commit_siblings", PGC_USERSET, WAL_SETTINGS,
                        gettext_noop("Sets the minimum concurrent open 
transactions before performing "
                                                 "commit_delay."),
diff --git a/trunc-clog-concurrency.c b/trunc-clog-concurrency.c
new file mode 100644
index 0000000..2c35dd9
--- /dev/null
+++ b/trunc-clog-concurrency.c
@@ -0,0 +1,178 @@
+#include <libpq-fe.h>
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+static void
+report_query_failure(const char *query, PGresult *res)
+{
+       fprintf(stderr, "query \"%s\" failed unexpectedly: %s",
+                       query, PQresultErrorMessage(res));
+}
+
+static void
+safe_query(PGconn *conn, const char *query)
+{
+       PGresult *res;
+
+       res = PQexec(conn, query);
+       if (PQresultStatus(res) != PGRES_TUPLES_OK &&
+               PQresultStatus(res) != PGRES_COMMAND_OK)
+       {
+               report_query_failure(query, res);
+               exit(1);
+       }
+       PQclear(res);
+}
+
+static int
+is_stop_limit(PGresult *res)
+{
+       return PQresultStatus(res) == PGRES_FATAL_ERROR
+               && strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "54000") 
== 0;
+}
+
+int
+main(int argc, char **argv)
+{
+       bool reached_stop_limit = false;
+       PGconn *mutate_conn, *hold1_conn, *hold2_conn, *burn_conn;
+       PGresult *res;
+       int n_burns = 0, n_inserts = 0;
+
+       if (argc != 1)
+       {
+               fputs("Usage: trunc-clog-concurrency\n", stderr);
+               return 1;
+       }
+
+       mutate_conn = PQconnectdb("");
+       if (PQstatus(mutate_conn) != CONNECTION_OK)
+       {
+               fprintf(stderr, "PGconnectdb failed: %s", 
PQerrorMessage(mutate_conn));
+               return 1;
+       }
+
+       hold1_conn = PQconnectdb("");
+       if (PQstatus(hold1_conn) != CONNECTION_OK)
+       {
+               fprintf(stderr, "PGconnectdb failed: %s", 
PQerrorMessage(hold1_conn));
+               return 1;
+       }
+
+       hold2_conn = PQconnectdb("");
+       if (PQstatus(hold2_conn) != CONNECTION_OK)
+       {
+               fprintf(stderr, "PGconnectdb failed: %s", 
PQerrorMessage(hold2_conn));
+               return 1;
+       }
+
+       burn_conn = PQconnectdb("options=--JJ_xid=1000000");
+       if (PQstatus(burn_conn) != CONNECTION_OK)
+       {
+               fprintf(stderr, "PGconnectdb failed: %s", 
PQerrorMessage(burn_conn));
+               return 1;
+       }
+
+       /* Start a transaction having an xid. */
+       safe_query(mutate_conn, "BEGIN;");
+       safe_query(mutate_conn, "DROP TABLE IF EXISTS trunc_clog_concurrency;");
+       safe_query(mutate_conn, "CREATE TABLE trunc_clog_concurrency ();");
+
+       /* Burn the entire XID space. */
+       while (!reached_stop_limit)
+       {
+               const char query[] = "SELECT txid_current();";
+               res = PQexec(burn_conn, query);
+               if (PQresultStatus(res) == PGRES_TUPLES_OK)
+               {
+                       ++n_burns;
+                       if (n_burns == 2)
+                       {
+                               safe_query(hold1_conn, "BEGIN ISOLATION LEVEL 
READ COMMITTED;");
+                               safe_query(hold1_conn, "CREATE TABLE 
trunc_clog_concurrency_hold1 ();");
+                       }
+                       if (n_burns == 10)
+                       {
+                               safe_query(hold2_conn, "BEGIN ISOLATION LEVEL 
READ COMMITTED;");
+                               safe_query(hold2_conn, "CREATE TABLE 
trunc_clog_concurrency_hold2 ();");
+                               system("psql -Xc 'select state, backend_xid, 
backend_xmin, query from pg_stat_activity'");
+                               system("psql -Xc 'select 
datname,datallowconn,datfrozenxid from pg_database'");
+                       }
+                       /* keep burning */;
+               }
+               else if (is_stop_limit(res))
+               {
+                       reached_stop_limit = true;
+                       fprintf(stderr, "reached stop limit: %s",
+                                       PQresultErrorMessage(res));
+               }
+               else
+               {
+                       reached_stop_limit = true; /* FIXME not really */
+                       report_query_failure(query, res);
+               }
+               PQclear(res);
+       }
+
+       /* Finish the first transaction.  xmin raises from start to start+2M. */
+       safe_query(mutate_conn, "COMMIT;");
+
+       /* Raise datfrozenxid of all but template1 to start+2M.  No truncation. 
*/
+       system("for d in postgres template0 test; do vacuumdb -F $d; done; "
+                  "echo -n 'DONE1 '; date");
+       /* Raise xmin to start+10M */
+       safe_query(hold1_conn, "COMMIT;");
+       /* Sleep on lock before truncating to start+2M. */
+       system("(vacuumdb -F template1; echo -n 'DONE2 '; date) &");
+       usleep(4000*1000); /* 4s */
+
+       /* Truncate to start+10M. */
+       system("(vacuumdb -aF; echo -n 'DONE3 '; date)");
+       system("psql -Xc 'select state, backend_xid, backend_xmin, query from 
pg_stat_activity'");
+       system("psql -Xc 'select datname,datallowconn,datfrozenxid from 
pg_database'");
+
+       /*
+        * We want to burn at least 1M xids (the amount protected by 
xidStopLimit)
+        * but not more than 200M (autovacuum_freeze_max_age default) to avoid a
+        * second set of VACUUMs.
+        */
+       while (n_inserts < 150)
+       {
+               const char query[] =
+                       "INSERT INTO trunc_clog_concurrency DEFAULT VALUES";
+               res = PQexec(burn_conn, query);
+               if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               {
+                       n_inserts++;
+                       fprintf(stderr, "insert %d ", n_inserts);
+                       system("date >&2");
+               }
+               else if (is_stop_limit(res))
+               {
+                       fprintf(stderr, "reached stop limit: %s",
+                                       PQresultErrorMessage(res));
+                       break;
+               }
+               else
+               {
+                       report_query_failure(query, res);
+                       return 1;
+               }
+               PQclear(res);
+       }
+
+       system("psql -Xc 'select state, backend_xid, backend_xmin, query from 
pg_stat_activity'");
+       system("psql -Xc 'select datname,datallowconn,datfrozenxid from 
pg_database'");
+
+       PQfinish(mutate_conn);
+       PQfinish(hold1_conn);
+       PQfinish(hold2_conn);
+       PQfinish(burn_conn);
+
+       return 0;
+}
## postgresql.conf for testing; pgdev_initdb copies it into the data directory
## after each initdb.  This is intended to work on 8.4 and later.

max_connections = 40
shared_buffers = 24MB
wal_buffers = 16MB
autovacuum = off
datestyle = 'iso, mdy'
deadlock_timeout = 20ms
default_text_search_config = 'pg_catalog.english'
fsync = off
full_page_writes = off
listen_addresses = ''
default_transaction_isolation = serializable

lc_messages = 'en_US.UTF-8'
lc_monetary = 'en_US.UTF-8'
lc_numeric = 'en_US.UTF-8'
lc_time = 'en_US.UTF-8'

log_directory = '..'
log_filename = 'pgdev.log'
log_line_prefix = '%p %m '
log_min_messages = debug1
log_statement = all
log_connections = on
log_disconnections = on
logging_collector = on

# I'd leave this on, but wal_level=minimal is nicer for speed.
#wal_level = hot_standby
#max_wal_senders = 5

Reply via email to