On Fri, Aug 8, 2014 at 11:43 PM, Fujii Masao <masao.fu...@gmail.com> wrote: > On Mon, Apr 14, 2014 at 11:40 PM, Tom Lane <t...@sss.pgh.pa.us> wrote: >> Fujii Masao <masao.fu...@gmail.com> writes: >>> On Tue, Apr 1, 2014 at 1:41 AM, Robert Haas <robertmh...@gmail.com> wrote: >>>> Should we try to install some hack around fastupdate for 9.4? I fear >>>> the divergence between reasonable values of work_mem and reasonable >>>> sizes for that list is only going to continue to get bigger. I'm sure >>>> there's somebody out there who has work_mem = 16GB, and stuff like >>>> 263865a48973767ce8ed7b7788059a38a24a9f37 is only going to increase the >>>> appeal of large values. >> >>> Controlling the threshold of the size of pending list only by GUC doesn't >>> seem reasonable. Users may want to increase the threshold only for the >>> GIN index which can be updated heavily, and decrease it otherwise. So >>> I think that it's better to add new storage parameter for GIN index to >>> control >>> the threshold, or both storage parameter and GUC. >> >> Yeah, -1 for a GUC. A GIN-index-specific storage parameter seems more >> appropriate. > > The attached patch introduces the GIN index storage parameter > "PENDING_LIST_CLEANUP_SIZE" which specifies the maximum size of > GIN pending list. If it's not set, work_mem is used as that maximum size, > instead. So this patch doesn't break the existing application which > currently uses work_mem as the threshold of cleanup operation of > the pending list. It has only not to set PENDING_LIST_CLEANUP_SIZE. > > This is an index storage parameter, and allows us to specify each > threshold per GIN index. So, for example, we can increase the threshold > only for the GIN index which can be updated heavily, and decrease it > otherwise. > > This patch uses another patch that I proposed (*1) as an infrastructure. > Please apply that infrastructure patch first if you apply this patch. > > (*1) > http://www.postgresql.org/message-id/CAHGQGwEanQ_e8WLHL25=bm_8z5zkyzw0k0yir+kdmv2hgne...@mail.gmail.com > > Regards, > > -- > Fujii Masao
Sorry, I forgot to attached the patch.... This time, attached. Regards, -- Fujii Masao
*** a/doc/src/sgml/gin.sgml --- b/doc/src/sgml/gin.sgml *************** *** 728,735 **** from the indexed item). As of <productname>PostgreSQL</productname> 8.4, <acronym>GIN</> is capable of postponing much of this work by inserting new tuples into a temporary, unsorted list of pending entries. ! When the table is vacuumed, or if the pending list becomes too large ! (larger than <xref linkend="guc-work-mem">), the entries are moved to the main <acronym>GIN</acronym> data structure using the same bulk insert techniques used during initial index creation. This greatly improves <acronym>GIN</acronym> index update speed, even counting the additional --- 728,736 ---- from the indexed item). As of <productname>PostgreSQL</productname> 8.4, <acronym>GIN</> is capable of postponing much of this work by inserting new tuples into a temporary, unsorted list of pending entries. ! When the table is vacuumed, or if the pending list becomes larger than ! <literal>PENDING_LIST_CLEANUP_SIZE</literal> (or ! <xref linkend="guc-work-mem"> if not set), the entries are moved to the main <acronym>GIN</acronym> data structure using the same bulk insert techniques used during initial index creation. This greatly improves <acronym>GIN</acronym> index update speed, even counting the additional *************** *** 812,829 **** </varlistentry> <varlistentry> ! <term><xref linkend="guc-work-mem"></term> <listitem> <para> During a series of insertions into an existing <acronym>GIN</acronym> index that has <literal>FASTUPDATE</> enabled, the system will clean up the pending-entry list whenever the list grows larger than ! <varname>work_mem</>. To avoid fluctuations in observed response time, ! it's desirable to have pending-list cleanup occur in the background ! (i.e., via autovacuum). Foreground cleanup operations can be avoided by ! increasing <varname>work_mem</> or making autovacuum more aggressive. ! However, enlarging <varname>work_mem</> means that if a foreground ! cleanup does occur, it will take even longer. </para> </listitem> </varlistentry> --- 813,839 ---- </varlistentry> <varlistentry> ! <term><literal>PENDING_LIST_CLEANUP_SIZE</> and ! <xref linkend="guc-work-mem"></term> <listitem> <para> During a series of insertions into an existing <acronym>GIN</acronym> index that has <literal>FASTUPDATE</> enabled, the system will clean up the pending-entry list whenever the list grows larger than ! <literal>PENDING_LIST_CLEANUP_SIZE</> (if not set, <varname>work_mem</> ! is used as that threshold, instead). To avoid fluctuations in observed ! response time, it's desirable to have pending-list cleanup occur in the ! background (i.e., via autovacuum). Foreground cleanup operations ! can be avoided by increasing <literal>PENDING_LIST_CLEANUP_SIZE</> ! (or <varname>work_mem</>) or making autovacuum more aggressive. ! However, enlarging the threshold of the cleanup operation means that ! if a foreground cleanup does occur, it will take even longer. ! </para> ! <para> ! <literal>PENDING_LIST_CLEANUP_SIZE</> is an index storage parameter, ! and allows each GIN index to have its own cleanup threshold. ! For example, it's possible to increase the threshold only for the GIN ! index which can be updated heavily, and decrease it otherwise. </para> </listitem> </varlistentry> *** a/doc/src/sgml/ref/create_index.sgml --- b/doc/src/sgml/ref/create_index.sgml *************** *** 356,361 **** CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ <replaceable class="parameter">name</ --- 356,377 ---- </listitem> </varlistentry> </variablelist> + <variablelist> + <varlistentry> + <term><literal>PENDING_LIST_CLEANUP_SIZE</></term> + <listitem> + <para> + This setting specifies the maximum size of the GIN pending list which is + used when <literal>FASTUPDATE</> is enabled. If the list grows larger than + this maximum size, it is cleaned up by moving the entries in it to the + main GIN data structure in bulk. The value is specified in kilobytes. + If this is not set, <literal>work_mem</> is used as the maximum size + of the pending list, instead. See <xref linkend="gin-fast-update"> and + <xref linkend="gin-tips"> for more information. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect2> <refsect2 id="SQL-CREATEINDEX-CONCURRENTLY"> *** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 106,126 **** PostgreSQL documentation </varlistentry> <varlistentry> - <term><option>-F <replaceable class="parameter">interval</replaceable></option></term> - <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term> - <listitem> - <para> - Specifies the maximum time to issue sync commands to ensure the - received WAL file is safely flushed to disk, in seconds. The default - value is zero, which disables issuing fsyncs except when WAL file is - closed. If <literal>-1</literal> is specified, WAL file is flushed as - soon as possible, that is, as soon as there are WAL data which has - not been flushed yet. - </para> - </listitem> - </varlistentry> - - <varlistentry> <term><option>-v</option></term> <term><option>--verbose</option></term> <listitem> --- 106,111 ---- *** a/src/backend/access/common/reloptions.c --- b/src/backend/access/common/reloptions.c *************** *** 105,111 **** static relopt_int intRelOpts[] = "Packs btree index pages only to this percentage", RELOPT_KIND_BTREE }, ! BTREE_DEFAULT_FILLFACTOR, BTREE_MIN_FILLFACTOR, 100 }, { { --- 105,111 ---- "Packs btree index pages only to this percentage", RELOPT_KIND_BTREE }, ! BTREE_DEFAULT_FILLFACTOR, BTREE_MIN_FILLFACTOR, 100, 0 }, { { *************** *** 113,119 **** static relopt_int intRelOpts[] = "Packs hash index pages only to this percentage", RELOPT_KIND_HASH }, ! HASH_DEFAULT_FILLFACTOR, HASH_MIN_FILLFACTOR, 100 }, { { --- 113,119 ---- "Packs hash index pages only to this percentage", RELOPT_KIND_HASH }, ! HASH_DEFAULT_FILLFACTOR, HASH_MIN_FILLFACTOR, 100, 0 }, { { *************** *** 121,127 **** static relopt_int intRelOpts[] = "Packs gist index pages only to this percentage", RELOPT_KIND_GIST }, ! GIST_DEFAULT_FILLFACTOR, GIST_MIN_FILLFACTOR, 100 }, { { --- 121,127 ---- "Packs gist index pages only to this percentage", RELOPT_KIND_GIST }, ! GIST_DEFAULT_FILLFACTOR, GIST_MIN_FILLFACTOR, 100, 0 }, { { *************** *** 129,135 **** static relopt_int intRelOpts[] = "Packs spgist index pages only to this percentage", RELOPT_KIND_SPGIST }, ! SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100 }, { { --- 129,135 ---- "Packs spgist index pages only to this percentage", RELOPT_KIND_SPGIST }, ! SPGIST_DEFAULT_FILLFACTOR, SPGIST_MIN_FILLFACTOR, 100, 0 }, { { *************** *** 137,143 **** static relopt_int intRelOpts[] = "Minimum number of tuple updates or deletes prior to vacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, INT_MAX }, { { --- 137,143 ---- "Minimum number of tuple updates or deletes prior to vacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, INT_MAX, 0 }, { { *************** *** 145,151 **** static relopt_int intRelOpts[] = "Minimum number of tuple inserts, updates or deletes prior to analyze", RELOPT_KIND_HEAP }, ! -1, 0, INT_MAX }, { { --- 145,151 ---- "Minimum number of tuple inserts, updates or deletes prior to analyze", RELOPT_KIND_HEAP }, ! -1, 0, INT_MAX, 0 }, { { *************** *** 153,159 **** static relopt_int intRelOpts[] = "Vacuum cost delay in milliseconds, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, 100 }, { { --- 153,159 ---- "Vacuum cost delay in milliseconds, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, 100, GUC_UNIT_MS }, { { *************** *** 161,167 **** static relopt_int intRelOpts[] = "Vacuum cost amount available before napping, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 1, 10000 }, { { --- 161,167 ---- "Vacuum cost amount available before napping, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 1, 10000, 0 }, { { *************** *** 169,175 **** static relopt_int intRelOpts[] = "Minimum age at which VACUUM should freeze a table row, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, 1000000000 }, { { --- 169,175 ---- "Minimum age at which VACUUM should freeze a table row, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, 1000000000, 0 }, { { *************** *** 177,183 **** static relopt_int intRelOpts[] = "Minimum multixact age at which VACUUM should freeze a row multixact's, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, 1000000000 }, { { --- 177,183 ---- "Minimum multixact age at which VACUUM should freeze a row multixact's, for autovacuum", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 0, 1000000000, 0 }, { { *************** *** 185,191 **** static relopt_int intRelOpts[] = "Age at which to autovacuum a table to prevent transaction ID wraparound", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 100000000, 2000000000 }, { { --- 185,191 ---- "Age at which to autovacuum a table to prevent transaction ID wraparound", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 100000000, 2000000000, 0 }, { { *************** *** 193,214 **** static relopt_int intRelOpts[] = "Multixact age at which to autovacuum a table to prevent multixact wraparound", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 100000000, 2000000000 }, { { "autovacuum_freeze_table_age", "Age at which VACUUM should perform a full table sweep to freeze row versions", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST ! }, -1, 0, 2000000000 }, { { "autovacuum_multixact_freeze_table_age", "Age of multixact at which VACUUM should perform a full table sweep to freeze row versions", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST ! }, -1, 0, 2000000000 }, /* list terminator */ {{NULL}} --- 193,222 ---- "Multixact age at which to autovacuum a table to prevent multixact wraparound", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST }, ! -1, 100000000, 2000000000, 0 }, { { "autovacuum_freeze_table_age", "Age at which VACUUM should perform a full table sweep to freeze row versions", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST ! }, -1, 0, 2000000000, 0 }, { { "autovacuum_multixact_freeze_table_age", "Age of multixact at which VACUUM should perform a full table sweep to freeze row versions", RELOPT_KIND_HEAP | RELOPT_KIND_TOAST ! }, -1, 0, 2000000000, 0 }, + { + { + "pending_list_cleanup_size", + "Maximum size of the pending list for this GIN index, in kilobytes.", + RELOPT_KIND_GIN + }, + -1, 0, MAX_KILOBYTES, GUC_UNIT_KB + }, /* list terminator */ {{NULL}} *************** *** 503,509 **** add_bool_reloption(bits32 kinds, char *name, char *desc, bool default_val) */ void add_int_reloption(bits32 kinds, char *name, char *desc, int default_val, ! int min_val, int max_val) { relopt_int *newoption; --- 511,517 ---- */ void add_int_reloption(bits32 kinds, char *name, char *desc, int default_val, ! int min_val, int max_val, int flags_val) { relopt_int *newoption; *************** *** 512,517 **** add_int_reloption(bits32 kinds, char *name, char *desc, int default_val, --- 520,526 ---- newoption->default_val = default_val; newoption->min = min_val; newoption->max = max_val; + newoption->flags = flags_val; add_reloption((relopt_gen *) newoption); } *************** *** 1003,1014 **** parse_one_reloption(relopt_value *option, char *text_str, int text_len, case RELOPT_TYPE_INT: { relopt_int *optint = (relopt_int *) option->gen; ! parsed = parse_int(value, &option->values.int_val, 0, NULL); if (validate && !parsed) ereport(ERROR, (errmsg("invalid value for integer option \"%s\": %s", ! option->gen->name, value))); if (validate && (option->values.int_val < optint->min || option->values.int_val > optint->max)) ereport(ERROR, --- 1012,1026 ---- case RELOPT_TYPE_INT: { relopt_int *optint = (relopt_int *) option->gen; + const char *hintmsg; ! parsed = parse_int(value, &option->values.int_val, ! optint->flags, &hintmsg); if (validate && !parsed) ereport(ERROR, (errmsg("invalid value for integer option \"%s\": %s", ! option->gen->name, value), ! hintmsg ? errhint("%s", _(hintmsg)) : 0)); if (validate && (option->values.int_val < optint->min || option->values.int_val > optint->max)) ereport(ERROR, *** a/src/backend/access/gin/ginfast.c --- b/src/backend/access/gin/ginfast.c *************** *** 227,232 **** ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) --- 227,233 ---- ginxlogUpdateMeta data; bool separateList = false; bool needCleanup = false; + int cleanupSize; if (collector->ntuples == 0) return; *************** *** 421,431 **** ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) * ginInsertCleanup could take significant amount of time, so we prefer to * call it when it can do all the work in a single collection cycle. In * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it ! * while pending list is still small enough to fit into work_mem. * * ginInsertCleanup() should not be called inside our CRIT_SECTION. */ ! if (metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L) needCleanup = true; UnlockReleaseBuffer(metabuffer); --- 422,436 ---- * ginInsertCleanup could take significant amount of time, so we prefer to * call it when it can do all the work in a single collection cycle. In * non-vacuum mode, it shouldn't require maintenance_work_mem, so fire it ! * while pending list is still small enough to fit into ! * pending_list_cleanup_size (or work_mem if not set). * * ginInsertCleanup() should not be called inside our CRIT_SECTION. */ ! cleanupSize = GinGetPendingListCleanupSize(index); ! if (cleanupSize == GIN_DEFAULT_PENDING_LIST_CLEANUP_SIZE) ! cleanupSize = work_mem; ! if (metadata->nPendingPages * GIN_PAGE_FREESIZE > cleanupSize * 1024L) needCleanup = true; UnlockReleaseBuffer(metabuffer); *** a/src/backend/access/gin/ginutil.c --- b/src/backend/access/gin/ginutil.c *************** *** 524,530 **** ginoptions(PG_FUNCTION_ARGS) GinOptions *rdopts; int numoptions; static const relopt_parse_elt tab[] = { ! {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)} }; options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN, --- 524,532 ---- GinOptions *rdopts; int numoptions; static const relopt_parse_elt tab[] = { ! {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)}, ! {"pending_list_cleanup_size", RELOPT_TYPE_INT, offsetof(GinOptions, ! pendingListCleanupSize)} }; options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN, *** a/src/backend/utils/misc/guc.c --- b/src/backend/utils/misc/guc.c *************** *** 95,108 **** #define CONFIG_EXEC_PARAMS_NEW "global/config_exec_params.new" #endif - /* upper limit for GUC variables measured in kilobytes of memory */ - /* note that various places assume the byte size fits in a "long" variable */ - #if SIZEOF_SIZE_T > 4 && SIZEOF_LONG > 4 - #define MAX_KILOBYTES INT_MAX - #else - #define MAX_KILOBYTES (INT_MAX / 1024) - #endif - #define KB_PER_MB (1024) #define KB_PER_GB (1024*1024) #define KB_PER_TB (1024*1024*1024) --- 95,100 ---- *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 371,377 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0)) /* * Any errors will already have been reported in the function process, --- 371,377 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 36,42 **** static char *basedir = NULL; static int verbose = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ - static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; --- 36,41 ---- *************** *** 63,70 **** usage(void) printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); - printf(_(" -F --fsync-interval=INTERVAL\n" - " frequency of syncs to transaction log files (in seconds)\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); --- 62,67 ---- *************** *** 333,340 **** StreamLog(void) starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial", ! fsync_interval); PQfinish(conn); } --- 330,336 ---- starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial"); PQfinish(conn); } *************** *** 364,370 **** main(int argc, char **argv) {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"no-loop", no_argument, NULL, 'n'}, - {"fsync-interval", required_argument, NULL, 'F'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, --- 360,365 ---- *************** *** 394,400 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv", long_options, &option_index)) != -1) { switch (c) --- 389,395 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv", long_options, &option_index)) != -1) { switch (c) *************** *** 441,455 **** main(int argc, char **argv) case 'n': noloop = 1; break; - case 'F': - fsync_interval = atoi(optarg) * 1000; - if (fsync_interval < -1000) - { - fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), - progname, optarg); - exit(1); - } - break; case 'v': verbose++; break; --- 436,441 ---- *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 31,44 **** static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; - static int64 last_fsync = -1; /* timestamp of last WAL file flush */ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos, ! int fsync_interval); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, --- 31,42 ---- static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, *************** *** 50,62 **** static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, XLogRecPtr blockpos, char *basedir, char *partial_suffix, XLogRecPtr *stoppos); - static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, - uint32 timeline, char *basedir, - stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos); - static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, - int64 last_status, int fsync_interval, - XLogRecPtr blockpos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); --- 48,53 ---- *************** *** 209,215 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) progname, current_walfile_name, partial_suffix); lastFlushPosition = pos; - last_fsync = feGetCurrentTimestamp(); return true; } --- 200,205 ---- *************** *** 440,456 **** CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * - * fsync_interval controls how often we flush to the received WAL file, - * in milliseconds. - * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix, ! int fsync_interval) { char query[128]; char slotcmd[128]; --- 430,442 ---- * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix) { char query[128]; char slotcmd[128]; *************** *** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval); if (res == NULL) goto error; --- 581,587 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos); if (res == NULL) goto error; *************** *** 760,766 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval) { char *copybuf = NULL; int64 last_status = -1; --- 746,752 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; *************** *** 777,812 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Check if we should continue streaming, or abort at this point. */ ! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos)) ! goto error; ! ! now = feGetCurrentTimestamp(); ! ! /* ! * If fsync_interval has elapsed since last WAL flush and we've written ! * some WAL data, flush them to disk. ! */ ! if (lastFlushPosition < blockpos && ! walfile != -1 && ! ((fsync_interval > 0 && ! feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) || ! fsync_interval < 0)) { ! if (fsync(walfile) != 0) { ! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), ! progname, current_walfile_name, strerror(errno)); goto error; } ! ! lastFlushPosition = blockpos; ! last_fsync = now; } /* * Potentially send a status message to the master */ if (still_sending && standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) --- 763,788 ---- /* * Check if we should continue streaming, or abort at this point. */ ! if (still_sending && stream_stop(blockpos, timeline, false)) { ! if (!close_walfile(basedir, partial_suffix, blockpos)) { ! /* Potential error message is written by close_walfile */ goto error; } ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) ! { ! fprintf(stderr, _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! goto error; ! } ! still_sending = false; } /* * Potentially send a status message to the master */ + now = feGetCurrentTimestamp(); if (still_sending && standby_message_timeout > 0 && feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) *************** *** 818,875 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, } /* ! * Calculate how long send/receive loops should sleep */ ! sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, ! last_status, fsync_interval, blockpos); ! ! r = CopyStreamReceive(conn, sleeptime, ©buf); ! while (r != 0) { ! if (r == -1) ! goto error; ! if (r == -2) { ! PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, ! basedir, partial_suffix, stoppos); ! if (res == NULL) ! goto error; ! else ! return res; } ! /* Check the message type. */ ! if (copybuf[0] == 'k') ! { ! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, ! &last_status)) ! goto error; ! } ! else if (copybuf[0] == 'w') ! { ! if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, ! timeline, basedir, stream_stop, partial_suffix)) ! goto error; ! ! /* ! * Check if we should continue streaming, or abort at this point. ! */ ! if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, ! stream_stop, partial_suffix, stoppos)) ! goto error; ! } ! else ! { ! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), ! progname, copybuf[0]); goto error; ! } ! /* ! * Process the received data, and any subsequent data we ! * can read without blocking. ! */ ! r = CopyStreamReceive(conn, 0, ©buf); } } --- 794,857 ---- } /* ! * Compute how long send/receive loops should sleep */ ! if (standby_message_timeout && still_sending) { ! int64 targettime; ! long secs; ! int usecs; ! ! targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); ! feTimestampDifference(now, ! targettime, ! &secs, ! &usecs); ! /* Always sleep at least 1 sec */ ! if (secs <= 0) { ! secs = 1; ! usecs = 0; } ! sleeptime = secs * 1000 + usecs / 1000; ! } ! else ! sleeptime = -1; ! ! r = CopyStreamReceive(conn, sleeptime, ©buf); ! if (r == 0) ! continue; ! if (r == -1) ! goto error; ! if (r == -2) ! { ! PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, ! basedir, partial_suffix, stoppos); ! if (res == NULL) goto error; ! else ! return res; ! } ! /* Check the message type. */ ! if (copybuf[0] == 'k') ! { ! if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, ! &last_status)) ! goto error; ! } ! else if (copybuf[0] == 'w') ! { ! if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, ! timeline, basedir, stream_stop, partial_suffix)) ! goto error; ! } ! else ! { ! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), ! progname, copybuf[0]); ! goto error; } } *************** *** 1211,1290 **** HandleEndOfCopyStream(PGconn *conn, char *copybuf, *stoppos = blockpos; return res; } - - /* - * Check if we should continue streaming, or abort at this point. - */ - static bool - CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, - char *basedir, stream_stop_callback stream_stop, - char *partial_suffix, XLogRecPtr *stoppos) - { - if (still_sending && stream_stop(blockpos, timeline, false)) - { - if (!close_walfile(basedir, partial_suffix, blockpos)) - { - /* Potential error message is written by close_walfile */ - return false; - } - if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) - { - fprintf(stderr, _("%s: could not send copy-end packet: %s"), - progname, PQerrorMessage(conn)); - return false; - } - still_sending = false; - } - - return true; - } - - /* - * Calculate how long send/receive loops should sleep - */ - static long - CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, - int64 last_status, int fsync_interval, XLogRecPtr blockpos) - { - int64 targettime = 0; - int64 status_targettime = 0; - int64 fsync_targettime = 0; - long sleeptime; - - if (standby_message_timeout && still_sending) - status_targettime = last_status + - (standby_message_timeout - 1) * ((int64) 1000); - - if (fsync_interval > 0 && lastFlushPosition < blockpos) - fsync_targettime = last_fsync + - (fsync_interval - 1) * ((int64) 1000); - - if ((status_targettime < fsync_targettime && status_targettime > 0) || - fsync_targettime == 0) - targettime = status_targettime; - else - targettime = fsync_targettime; - - if (targettime > 0) - { - long secs; - int usecs; - - feTimestampDifference(now, - targettime, - &secs, - &usecs); - /* Always sleep at least 1 sec */ - if (secs <= 0) - { - secs = 1; - usecs = 0; - } - - sleeptime = secs * 1000 + usecs / 1000; - } - else - sleeptime = -1; - - return sleeptime; - } --- 1193,1195 ---- *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 16,20 **** extern bool ReceiveXlogStream(PGconn *conn, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, ! int fsync_interval); --- 16,19 ---- char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix); *** a/src/bin/psql/tab-complete.c --- b/src/bin/psql/tab-complete.c *************** *** 1134,1140 **** psql_completion(const char *text, int start, int end) pg_strcasecmp(prev_wd, "(") == 0) { static const char *const list_INDEXOPTIONS[] = ! {"fillfactor", "fastupdate", NULL}; COMPLETE_WITH_LIST(list_INDEXOPTIONS); } --- 1134,1140 ---- pg_strcasecmp(prev_wd, "(") == 0) { static const char *const list_INDEXOPTIONS[] = ! {"fillfactor", "fastupdate", "pending_list_cleanup_size", NULL}; COMPLETE_WITH_LIST(list_INDEXOPTIONS); } *** a/src/include/access/gin_private.h --- b/src/include/access/gin_private.h *************** *** 314,325 **** typedef struct GinOptions --- 314,331 ---- { int32 vl_len_; /* varlena header (do not touch directly!) */ bool useFastUpdate; /* use fast updates? */ + int pendingListCleanupSize; /* maximum size of pending list */ } GinOptions; #define GIN_DEFAULT_USE_FASTUPDATE true #define GinGetUseFastUpdate(relation) \ ((relation)->rd_options ? \ ((GinOptions *) (relation)->rd_options)->useFastUpdate : GIN_DEFAULT_USE_FASTUPDATE) + #define GIN_DEFAULT_PENDING_LIST_CLEANUP_SIZE -1 + #define GinGetPendingListCleanupSize(relation) \ + ((relation)->rd_options ? \ + ((GinOptions *) (relation)->rd_options)->pendingListCleanupSize : \ + GIN_DEFAULT_PENDING_LIST_CLEANUP_SIZE) /* Macros for buffer lock/unlock operations */ *** a/src/include/access/reloptions.h --- b/src/include/access/reloptions.h *************** *** 92,97 **** typedef struct relopt_int --- 92,98 ---- int default_val; int min; int max; + int flags; } relopt_int; typedef struct relopt_real *************** *** 244,250 **** extern relopt_kind add_reloption_kind(void); extern void add_bool_reloption(bits32 kinds, char *name, char *desc, bool default_val); extern void add_int_reloption(bits32 kinds, char *name, char *desc, ! int default_val, int min_val, int max_val); extern void add_real_reloption(bits32 kinds, char *name, char *desc, double default_val, double min_val, double max_val); extern void add_string_reloption(bits32 kinds, char *name, char *desc, --- 245,251 ---- extern void add_bool_reloption(bits32 kinds, char *name, char *desc, bool default_val); extern void add_int_reloption(bits32 kinds, char *name, char *desc, ! int default_val, int min_val, int max_val, int flags_val); extern void add_real_reloption(bits32 kinds, char *name, char *desc, double default_val, double min_val, double max_val); extern void add_string_reloption(bits32 kinds, char *name, char *desc, *** a/src/include/utils/guc.h --- b/src/include/utils/guc.h *************** *** 18,23 **** --- 18,31 ---- #include "utils/array.h" + /* upper limit for GUC variables measured in kilobytes of memory */ + /* note that various places assume the byte size fits in a "long" variable */ + #if SIZEOF_SIZE_T > 4 && SIZEOF_LONG > 4 + #define MAX_KILOBYTES INT_MAX + #else + #define MAX_KILOBYTES (INT_MAX / 1024) + #endif + /* * Certain options can only be set at certain times. The rules are * like this:
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers