Hi.
We had some real cases when client set rather big batch_size on server
level, but for some foreign table, containing large documents, it was
inadequate and lead to OOM killer intervention. You can argue that
batch_size can be set on foreign table level, but it can still be not
flexible enough, when tuple size varies. I suppose this case is also
takes place for fetch_size. Issue here is that we can't somehow limit
size of data (versus number of rows) while fetching from cursor. But we
can use tuple store to preserve fetched results, so that they spill out
to the disk.
I'm attaching two patches which try to fix issues with possible huge
memory usage by postgres_fdw batches.
With fetched tuples we still can't use only tuplestore, as ctids are not
preserved, and so have to store them separately.
The reproducer for insert is simple.
create extension postgres_fdw ;
create server loopback foreign data wrapper postgres_fdw options (dbname
'postgres', port '5432', batch_size '100', fetch_size '100');
create table base_table(i int, s bytea);
create foreign table foreign_table (i int, s bytea) server loopback
options(table_name 'base_table');
create user mapping for public server loopback ;
insert into foreign_table select i,
pg_read_binary_file('/some/big/file') from generate_series(1,1000) i;
will easily grow backend RSS to several gigabytes.
The first patch fixes this problem.
The second patch alleviates the second issue - SELECT * queries also can
grow backend memory to several GBs. Still memory usage can peak (on my
toy examples) up to 3-4 GB, but at least it seams 1-2 GB less than
non-patched version.
--
Best regards,
Alexander Pyhalov,
Postgres Professional
From 7bb6aeadd318ede7f78b98908d58a432bee3309f Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Tue, 16 Dec 2025 18:46:27 +0300
Subject: [PATCH 1/2] Limit batch_size for foreign insert with work_mem
Option batch_size can be set on foreign server level. It can be very optimistic
even for one table with different tuple lengths. To prevent large memory usage
limit effective batch size with work_mem.
---
src/backend/executor/nodeModifyTable.c | 38 ++++++++++++++++++++++++--
1 file changed, 35 insertions(+), 3 deletions(-)
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 874b71e6608..19cedde0faa 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -176,6 +176,8 @@ static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context,
ResultRelInfo *resultRelInfo,
bool canSetTag);
+static Size estimate_batch_length(ResultRelInfo *resultRelInfo);
+
/*
* Verify that the tuples to be produced by INSERT match the
@@ -828,6 +830,33 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
return ExecProject(newProj);
}
+/*
+ * estimate_batch_length
+ * When batch foreign insert is enabled, estimate size of
+ * currently gathered batch.
+ */
+static Size
+estimate_batch_length(ResultRelInfo *resultRelInfo)
+{
+ Size batch_len = 0;
+ int i = 0;
+
+ for (i = 0; i < resultRelInfo->ri_NumSlots; i++)
+ {
+ TupleTableSlot *slot;
+ Size slot_tuple_len;
+
+ slot = resultRelInfo->ri_Slots[i];
+
+ slot_getallattrs(slot);
+ slot_tuple_len = heap_compute_data_size(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull);
+ batch_len += slot_tuple_len;
+ }
+
+ return batch_len;
+}
+
+
/* ----------------------------------------------------------------
* ExecInsert
*
@@ -943,12 +972,15 @@ ExecInsert(ModifyTableContext *context,
if (resultRelInfo->ri_BatchSize > 1)
{
bool flushed = false;
+ Size batch_len;
+
+ batch_len = estimate_batch_length(resultRelInfo);
/*
- * When we've reached the desired batch size, perform the
- * insertion.
+ * When we've reached the desired batch size or exceeded work_mem,
+ * perform the insertion.
*/
- if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize)
+ if (resultRelInfo->ri_NumSlots == resultRelInfo->ri_BatchSize || batch_len > work_mem)
{
ExecBatchInsert(mtstate, resultRelInfo,
resultRelInfo->ri_Slots,
--
2.43.0
From 44a4a6b8fc2910ab8bb241585855532da00f0e84 Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <[email protected]>
Date: Thu, 18 Dec 2025 09:17:42 +0300
Subject: [PATCH 2/2] Use tuplestore in PgFdwScanState scan state to limit
memory usage
A tuplestore doesn't preserve ctids, so we need to store them
separately.
---
contrib/postgres_fdw/postgres_fdw.c | 74 +++++++++++++++++++++++++----
1 file changed, 66 insertions(+), 8 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 5e178c21b39..389e906a5d5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -158,8 +158,12 @@ typedef struct PgFdwScanState
const char **param_values; /* textual values of query parameters */
/* for storing result tuples */
- HeapTuple *tuples; /* array of currently-retrieved tuples */
- int num_tuples; /* # of tuples in array */
+ Tuplestorestate *result_store; /* currently-retrieved tuples */
+ ItemPointerData *ctids; /* separate store for ctids */
+
+ TupleTableSlot *tupstore_slot; /* slot needed for retrieving tuples from
+ * tuplestore */
+ int num_tuples; /* # of tuples in tuple store */
int next_tuple; /* index of next one to return */
/* batch-level state, for optimizing rewinds and avoiding useless fetch */
@@ -546,6 +550,8 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+static void clean_scan_state_result_store(PgFdwScanState *fsstate);
+
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -1605,6 +1611,7 @@ postgresIterateForeignScan(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ HeapTuple newtup;
/*
* In sync mode, if this is the first call after Begin or ReScan, we need
@@ -1631,12 +1638,31 @@ postgresIterateForeignScan(ForeignScanState *node)
return ExecClearTuple(slot);
}
+ if (!tuplestore_gettupleslot(fsstate->result_store, true, true,
+ fsstate->tupstore_slot))
+ {
+ /*
+ * We've checked that next_tuple is less than fsstate->num_tuples, so
+ * there should be some result
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("unexpected end of store")));
+
+ }
+
+ newtup = ExecFetchSlotHeapTuple(fsstate->tupstore_slot, true, NULL);
+ /* Tuple store doesn't preserve ctid, so restore it separately */
+ newtup->t_self = newtup->t_data->t_ctid = fsstate->ctids[fsstate->next_tuple];
+ ExecClearTuple(fsstate->tupstore_slot);
+
/*
* Return the next tuple.
*/
- ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
+ ExecStoreHeapTuple(newtup,
slot,
false);
+ fsstate->next_tuple++;
return slot;
}
@@ -1699,6 +1725,8 @@ postgresReScanForeignScan(ForeignScanState *node)
{
/* Easy: just rescan what we already have in memory, if anything */
fsstate->next_tuple = 0;
+ if (fsstate->result_store)
+ tuplestore_rescan(fsstate->result_store);
return;
}
@@ -1708,7 +1736,7 @@ postgresReScanForeignScan(ForeignScanState *node)
PQclear(res);
/* Now force a fresh FETCH. */
- fsstate->tuples = NULL;
+ clean_scan_state_result_store(fsstate);
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
@@ -1737,6 +1765,8 @@ postgresEndForeignScan(ForeignScanState *node)
ReleaseConnection(fsstate->conn);
fsstate->conn = NULL;
+ clean_scan_state_result_store(fsstate);
+
/* MemoryContexts will be deleted automatically. */
}
@@ -3781,7 +3811,8 @@ create_cursor(ForeignScanState *node)
/* Mark the cursor as created, and show no tuples have been retrieved */
fsstate->cursor_exists = true;
- fsstate->tuples = NULL;
+ clean_scan_state_result_store(fsstate);
+
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
@@ -3808,7 +3839,8 @@ fetch_more_data(ForeignScanState *node)
* We'll store the tuples in the batch_cxt. First, flush the previous
* batch.
*/
- fsstate->tuples = NULL;
+ clean_scan_state_result_store(fsstate);
+
MemoryContextReset(fsstate->batch_cxt);
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
@@ -3844,21 +3876,28 @@ fetch_more_data(ForeignScanState *node)
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->result_store = tuplestore_begin_heap(true, false, work_mem);
+ fsstate->tupstore_slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
fsstate->num_tuples = numrows;
fsstate->next_tuple = 0;
+ fsstate->ctids = palloc0(numrows * sizeof(ItemPointerData));
for (i = 0; i < numrows; i++)
{
+ HeapTuple newtup;
+
Assert(IsA(node->ss.ps.plan, ForeignScan));
- fsstate->tuples[i] =
+ newtup =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
fsstate->retrieved_attrs,
node,
fsstate->temp_cxt);
+ tuplestore_puttuple(fsstate->result_store, newtup);
+ ItemPointerCopy(&newtup->t_self, &fsstate->ctids[i]);
+ heap_freetuple(newtup);
}
/* Update fetch_ct_2 */
@@ -7636,6 +7675,25 @@ make_tuple_from_result_row(PGresult *res,
return tuple;
}
+/*
+ * Clean PgFdwScanState result store
+ */
+static void
+clean_scan_state_result_store(PgFdwScanState *fsstate)
+{
+ if (fsstate->result_store)
+ {
+ /*
+ * We partially rely on the fact that batch_cxt is also reset
+ */
+ tuplestore_end(fsstate->result_store);
+ ExecDropSingleTupleTableSlot(fsstate->tupstore_slot);
+ fsstate->result_store = NULL;
+ fsstate->tupstore_slot = NULL;
+ fsstate->ctids = NULL;
+ }
+}
+
/*
* Callback function which is called when error occurs during column value
* conversion. Print names of column and relation.
--
2.43.0