From bb4d915dcd16b309159911a818ab006abce3838e Mon Sep 17 00:00:00 2001
From: Rafia Sabih <rafia.sabih@cybertec.at>
Date: Mon, 29 Sep 2025 16:19:37 +0200
Subject: [PATCH] Fetch without cursors

This adds a GUC to enable/ disable cursor mode in postgres_fdw.
The GUC is called postgres_fdw.use_cursor. When it is set, everything
works as it is now. However, there is a limitation to the current
mechanism, it is unable to use parallel plans at the local side because
of the use of cursors. Now, to overcome this one can unset the
abovementioned GUC.
In non-cursor mode cursors are not used and hence the parallel plans
can be used at the local side. In non-cursor mode fetch_size is used to as is.

A caveat with the non-cursor mode is that when simultaneous queries are
fired at the local side, i.e. more than one cursor is opened at a time,
then we use Tuplestore to save the tuples for the already running query before
creating the second cursor. Now, when in further iterations when fetch is called
for the first cursor, then the tuples are retrieved from the tuplestore.

Original idea: Bernd Helmle
Key suggestions: Robert Haas
---
 contrib/postgres_fdw/connection.c   |   7 +
 contrib/postgres_fdw/option.c       |  17 +
 contrib/postgres_fdw/postgres_fdw.c | 489 +++++++++++++++++++++++-----
 contrib/postgres_fdw/postgres_fdw.h |  20 +-
 4 files changed, 442 insertions(+), 91 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4fbb6c182b8..5e20d9f2040 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -965,6 +965,13 @@ pgfdw_get_result(PGconn *conn)
 	return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
 }
 
+PGresult *
+pgfdw_get_next_result(PGconn *conn)
+{
+	return libpqsrv_get_result(conn, pgfdw_we_get_result);
+}
+
+
 /*
  * Report an error we got from the remote server.
  *
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 04788b7e8b3..1f0923f7de8 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -44,6 +44,7 @@ static PgFdwOption *postgres_fdw_options;
  * GUC parameters
  */
 char	   *pgfdw_application_name = NULL;
+bool pgfdw_use_cursor = true;
 
 /*
  * Helper functions
@@ -586,5 +587,21 @@ _PG_init(void)
 							   NULL,
 							   NULL);
 
+	/*
+	 * If use_cursor is set to false, then the new way of fetching is used, which allows for the
+	 * use of parallel plans at the local side. In the cursor mode, parallel plans could not be
+	 * used.
+	 */
+	DefineCustomBoolVariable("postgres_fdw.use_cursor",
+							"If set uses the cursor, otherwise fetches without cursor",
+							NULL,
+							&pgfdw_use_cursor,
+							true,
+							PGC_USERSET,
+							0,
+							NULL,
+							NULL,
+							NULL);
+
 	MarkGUCPrefixReserved("postgres_fdw");
 }
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 456b267f70b..0d494e0f249 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -22,6 +22,7 @@
 #include "commands/explain_format.h"
 #include "commands/explain_state.h"
 #include "executor/execAsync.h"
+#include "executor/executor.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -174,7 +175,7 @@ typedef struct PgFdwScanState
 	MemoryContext temp_cxt;		/* context for per-tuple temporary data */
 
 	int			fetch_size;		/* number of tuples per fetch */
-} PgFdwScanState;
+;} PgFdwScanState;
 
 /*
  * Execution state of a foreign insert/update/delete operation.
@@ -451,7 +452,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 									  EquivalenceClass *ec, EquivalenceMember *em,
 									  void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool use_tts);
 static void close_cursor(PGconn *conn, unsigned int cursor_number,
 						 PgFdwConnState *conn_state);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -546,7 +547,6 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
 							  const PgFdwRelationInfo *fpinfo_i);
 static int	get_batch_size_option(Relation rel);
 
-
 /*
  * Foreign-data wrapper handler function: return a struct with pointers
  * to my callback routines.
@@ -1548,6 +1548,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 	/* Get private info created by planner functions. */
 	fsstate->query = strVal(list_nth(fsplan->fdw_private,
 									 FdwScanPrivateSelectSql));
+
 	fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
 												 FdwScanPrivateRetrievedAttrs);
 	fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
@@ -1593,6 +1594,63 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
 
 	/* Set the async-capable flag */
 	fsstate->async_capable = node->ss.ps.async_capable;
+	fsstate->conn_state->last_attinmeta = NULL;
+	fsstate->conn_state->last_query = NULL;
+	fsstate->conn_state->tuplestore = NULL;
+	fsstate->conn_state->total_tuples = 0;
+	fsstate->conn_state->last_retrieved_attrs = NULL;
+}
+
+static void
+fillTupleSlot(PgFdwScanState *fsstate, ForeignScanState *node)
+{
+	char *cur_query = fsstate->query;
+	const char **values = fsstate->param_values;
+	int	numParams = fsstate->numParams;
+	Relation cur_rel = fsstate->rel;
+	TupleDesc cur_tupdesc = fsstate->tupdesc;
+	AttInMetadata *cur_attinmeta = fsstate->attinmeta;
+	List *cur_retrieved_attrs = fsstate->retrieved_attrs;
+	StringInfoData buf;
+
+	fsstate->conn_state->cursor_number = fsstate->cursor_number-1;
+	initStringInfo(&buf);
+
+	/* Populate the fsstate with the details stored in conn_state for the last query.*/
+	fsstate->query = fsstate->conn_state->last_query;
+	fsstate->tupdesc = fsstate->conn_state->last_tupdesc;
+	fsstate->rel = fsstate->conn_state->last_rel;
+	fsstate->attinmeta = fsstate->conn_state->last_attinmeta;
+	fsstate->retrieved_attrs = fsstate->conn_state->last_retrieved_attrs;
+	if (fsstate->conn->asyncStatus == PGASYNC_IDLE)
+	{
+		/* Setting the query to run for the exisiting cursor */
+		appendStringInfo(&buf, "%s", fsstate->query);
+
+		if (!PQsendQueryParams(fsstate->conn, buf.data, numParams,
+							NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(NULL, false, buf.data);
+
+		/* Call for Chunked rows mode with same size of chunk as the fetch size */
+		if (!PQsetChunkedRowsMode(fsstate->conn, fsstate->fetch_size))
+			pgfdw_report_error(NULL, false, buf.data);
+	}
+	fetch_more_data(node, true);
+
+	fsstate->conn_state->last_query_processed = true;
+	fsstate->conn_state->last_query = NULL;
+	fsstate->num_tuples = 0;
+	fsstate->tuples = NULL;
+	if (fsstate->conn_state->last_tupdesc)
+	{
+		ReleaseTupleDesc(fsstate->conn_state->last_tupdesc);
+		fsstate->conn_state->last_tupdesc = NULL;
+	}
+	fsstate->query = cur_query;
+	fsstate->tupdesc = cur_tupdesc;
+	fsstate->rel = cur_rel;
+	fsstate->attinmeta = cur_attinmeta;
+	fsstate->retrieved_attrs = cur_retrieved_attrs;
 }
 
 /*
@@ -1613,8 +1671,21 @@ postgresIterateForeignScan(ForeignScanState *node)
 	 * first call after Begin or ReScan.
 	 */
 	if (!fsstate->cursor_exists)
-		create_cursor(node);
-
+	{
+		if (fsstate->conn_state->last_query)
+		{
+			/*
+			 * At close cursor function, the last_query null so it should
+			 * only reach here when the current cursor is still open.
+			 */
+			fillTupleSlot(fsstate, node);
+			create_cursor(node);
+		}
+		else
+		{
+			create_cursor(node);
+		}
+	}
 	/*
 	 * Get some more tuples, if we've run out.
 	 */
@@ -1623,21 +1694,47 @@ postgresIterateForeignScan(ForeignScanState *node)
 		/* In async mode, just clear tuple slot. */
 		if (fsstate->async_capable)
 			return ExecClearTuple(slot);
+
 		/* No point in another fetch if we already detected EOF, though. */
 		if (!fsstate->eof_reached)
-			fetch_more_data(node);
+			fetch_more_data(node, false);
+
 		/* If we didn't get any tuples, must be end of data. */
 		if (fsstate->next_tuple >= fsstate->num_tuples)
+		{
+			if (!pgfdw_use_cursor)
+			{
+				if (fsstate->cursor_number == fsstate->conn_state->cursor_number &&
+					fsstate->conn_state->tuplestore)
+						MemoryContextReset(fsstate->conn_state->tupstore_context);
+				if (fsstate->conn_state->last_tupdesc)
+				{
+					ReleaseTupleDesc(fsstate->conn_state->last_tupdesc);
+					fsstate->conn_state->last_tupdesc = NULL;
+				}
+			}
 			return ExecClearTuple(slot);
+		}
 	}
 
 	/*
 	 * Return the next tuple.
 	 */
-	ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
-					   slot,
-					   false);
+	if (fsstate->tuples[fsstate->next_tuple])
+		ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
+						slot,
+						false);
 
+	else
+	{
+		MemoryContextReset(fsstate->conn_state->tupstore_context);
+		if (!pgfdw_use_cursor  && fsstate->conn_state->last_tupdesc)
+		{
+			ReleaseTupleDesc(fsstate->conn_state->last_tupdesc);
+			fsstate->conn_state->last_tupdesc = NULL;
+		}
+		return ExecClearTuple(slot);
+	}
 	return slot;
 }
 
@@ -1666,7 +1763,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 	if (fsstate->async_capable &&
 		fsstate->conn_state->pendingAreq &&
 		fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
-		fetch_more_data(node);
+		fetch_more_data(node, false);
 
 	/*
 	 * If any internal parameters affecting this node have changed, we'd
@@ -1677,42 +1774,44 @@ postgresReScanForeignScan(ForeignScanState *node)
 	 * case.  If we've only fetched zero or one batch, we needn't even rewind
 	 * the cursor, just rescan what we have.
 	 */
-	if (node->ss.ps.chgParam != NULL)
-	{
-		fsstate->cursor_exists = false;
-		snprintf(sql, sizeof(sql), "CLOSE c%u",
-				 fsstate->cursor_number);
-	}
-	else if (fsstate->fetch_ct_2 > 1)
+	if (pgfdw_use_cursor)
 	{
-		if (PQserverVersion(fsstate->conn) < 150000)
-			snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
-					 fsstate->cursor_number);
-		else
+		if (node->ss.ps.chgParam != NULL)
 		{
 			fsstate->cursor_exists = false;
 			snprintf(sql, sizeof(sql), "CLOSE c%u",
-					 fsstate->cursor_number);
+					fsstate->cursor_number);
 		}
-	}
-	else
-	{
-		/* Easy: just rescan what we already have in memory, if anything */
+		else if (fsstate->fetch_ct_2 > 1)
+		{
+			if (PQserverVersion(fsstate->conn) < 150000)
+				snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
+						fsstate->cursor_number);
+			else
+			{
+				fsstate->cursor_exists = false;
+				snprintf(sql, sizeof(sql), "CLOSE c%u",
+						fsstate->cursor_number);
+			}
+		}
+		else
+		{
+			/* Easy: just rescan what we already have in memory, if anything */
+			fsstate->next_tuple = 0;
+			return;
+		}
+		res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(res, fsstate->conn, sql);
+		PQclear(res);
+
+		/* Now force a fresh FETCH. */
+		fsstate->tuples = NULL;
+		fsstate->num_tuples = 0;
 		fsstate->next_tuple = 0;
-		return;
+		fsstate->fetch_ct_2 = 0;
+		fsstate->eof_reached = false;
 	}
-
-	res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(res, fsstate->conn, sql);
-	PQclear(res);
-
-	/* Now force a fresh FETCH. */
-	fsstate->tuples = NULL;
-	fsstate->num_tuples = 0;
-	fsstate->next_tuple = 0;
-	fsstate->fetch_ct_2 = 0;
-	fsstate->eof_reached = false;
 }
 
 /*
@@ -1737,6 +1836,9 @@ postgresEndForeignScan(ForeignScanState *node)
 	ReleaseConnection(fsstate->conn);
 	fsstate->conn = NULL;
 
+	/* To know if there are simulataneous queries running. */
+	fsstate->conn_state->num_queries--;
+
 	/* MemoryContexts will be deleted automatically. */
 }
 
@@ -3730,7 +3832,7 @@ create_cursor(ForeignScanState *node)
 	const char **values = fsstate->param_values;
 	PGconn	   *conn = fsstate->conn;
 	StringInfoData buf;
-	PGresult   *res;
+	PGresult   *res = NULL;
 
 	/* First, process a pending asynchronous request, if any. */
 	if (fsstate->conn_state->pendingAreq)
@@ -3755,29 +3857,69 @@ create_cursor(ForeignScanState *node)
 		MemoryContextSwitchTo(oldcontext);
 	}
 
-	/* Construct the DECLARE CURSOR command */
-	initStringInfo(&buf);
-	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
-					 fsstate->cursor_number, fsstate->query);
+	if (pgfdw_use_cursor)
+	{
+		/* Construct the DECLARE CURSOR command */
+		initStringInfo(&buf);
+		appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
+						fsstate->cursor_number, fsstate->query);
 
-	/*
-	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
-	 * to infer types for all parameters.  Since we explicitly cast every
-	 * parameter (see deparse.c), the "inference" is trivial and will produce
-	 * the desired result.  This allows us to avoid assuming that the remote
-	 * server has the same OIDs we do for the parameters' types.
-	 */
-	if (!PQsendQueryParams(conn, buf.data, numParams,
-						   NULL, values, NULL, NULL, 0))
-		pgfdw_report_error(NULL, conn, buf.data);
+		/*
+		* Notice that we pass NULL for paramTypes, thus forcing the remote server
+		* to infer types for all parameters.  Since we explicitly cast every
+		* parameter (see deparse.c), the "inference" is trivial and will produce
+		* the desired result.  This allows us to avoid assuming that the remote
+		* server has the same OIDs we do for the parameters' types.
+		*/
+		if (!PQsendQueryParams(conn, buf.data, numParams,
+							NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(NULL, conn, buf.data);
 
-	/*
-	 * Get the result, and check for success.
-	 */
-	res = pgfdw_get_result(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(res, conn, fsstate->query);
-	PQclear(res);
+		/*
+		* Get the result, and check for success.
+		*/
+		res = pgfdw_get_result(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(res, conn, fsstate->query);
+		PQclear(res);
+	}
+	else
+	{
+		/* Setup to fetch without cursors */
+		if (fsstate->conn_state->last_query)
+		{
+			/*
+			 * This is to handle the case when another cursor was created
+			 * while the call to process_query_params above
+			 */
+			fillTupleSlot(fsstate, node);
+		}
+
+		initStringInfo(&buf);
+		appendStringInfo(&buf, "%s", fsstate->query);
+
+		if (!PQsendQueryParams(conn, buf.data, numParams,
+							NULL, values, NULL, NULL, 0))
+			pgfdw_report_error(NULL, conn, buf.data);
+
+		/* Call for Chunked rows mode with same size of chunk as the fetch size */
+		if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size))
+			pgfdw_report_error(NULL, conn, buf.data);
+
+		/* We need to know if there are simultaneous queries running. */
+		if (fsstate->conn_state->tuplestore == NULL)
+		{
+			/* we can only populate this when the last curosrs' tuples are retreived */
+			fsstate->conn_state->num_queries++;
+			fsstate->conn_state->last_query = fsstate->query;
+			fsstate->conn_state->cursor_number = fsstate->cursor_number;
+			fsstate->conn_state->total_tuples = 0;
+			fsstate->conn_state->last_rel = fsstate->rel;
+			fsstate->conn_state->last_tupdesc = fsstate->tupdesc;
+			fsstate->conn_state->last_attinmeta = fsstate->attinmeta;
+			fsstate->conn_state->last_retrieved_attrs = fsstate->retrieved_attrs;
+		}
+	}
 
 	/* Mark the cursor as created, and show no tuples have been retrieved */
 	fsstate->cursor_exists = true;
@@ -3795,14 +3937,16 @@ create_cursor(ForeignScanState *node)
  * Fetch some more rows from the node's cursor.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool use_tts)
 {
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	PGconn	   *conn = fsstate->conn;
 	PGresult   *res;
 	int			numrows;
-	int			i;
+	int			i = 0;
 	MemoryContext oldcontext;
+	bool already_done = false;
+	EState	   *estate = node->ss.ps.state;
 
 	/*
 	 * We'll store the tuples in the batch_cxt.  First, flush the previous
@@ -3828,7 +3972,7 @@ fetch_more_data(ForeignScanState *node)
 		/* Reset per-connection state */
 		fsstate->conn_state->pendingAreq = NULL;
 	}
-	else
+	else if (pgfdw_use_cursor)
 	{
 		char		sql[64];
 
@@ -3841,24 +3985,168 @@ fetch_more_data(ForeignScanState *node)
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			pgfdw_report_error(res, conn, fsstate->query);
 	}
-
-	/* Convert the data into HeapTuples */
-	numrows = PQntuples(res);
-	fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
-	fsstate->num_tuples = numrows;
-	fsstate->next_tuple = 0;
-
-	for (i = 0; i < numrows; i++)
+	else
 	{
-		Assert(IsA(node->ss.ps.plan, ForeignScan));
+		/* Retrieve the tuples from the TupleSlot instead of actual fetch */
+		if (fsstate->conn_state->num_queries >1 && fsstate->conn_state->tuplestore &&
+			fsstate->cursor_number == fsstate->conn_state->cursor_number)
+		{
+			MemoryContextSwitchTo(fsstate->conn_state->tupstore_context);
+			already_done = true;
+			numrows = fsstate->conn_state->total_tuples;
+			fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+			fsstate->num_tuples = numrows;
+			fsstate->next_tuple = 0;
+
+			for (i = 0; i < numrows; i++)
+			{
+				while (tuplestore_gettupleslot(fsstate->conn_state->tuplestore, true, true, fsstate->conn_state->slot))
+					fsstate->tuples[i++] = ExecFetchSlotHeapTuple(fsstate->conn_state->slot, true, NULL);
+			}
+			tuplestore_end(fsstate->conn_state->tuplestore);
+			fsstate->conn_state->slot = NULL;
+			fsstate->conn_state->tuplestore = NULL;
+			fsstate->conn_state->total_tuples = 0;
+			fsstate->conn_state->cursor_number = -1;
+			fsstate->eof_reached = true;
+			MemoryContextSwitchTo(fsstate->batch_cxt);
+		}
+		else
+		{
+			/* Non-cursor mode uses PQSetChunkedRowsMode during create_cursor, so just get the result here. */
+			res = pgfdw_get_next_result(conn);
+			if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+				pgfdw_report_error(res, conn, fsstate->query);
+			else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+			{
+				/* This signifies query is completed and there are no more tuples left */
+				fsstate->eof_reached = true;
+				while (res!= NULL)
+					res = pgfdw_get_result(conn);
+			}
+			else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK)
+			{
+				if (use_tts)
+				{
+					HeapTuple temp_tuple = (HeapTuple) palloc0(sizeof(HeapTuple));
+				   /*
+					* This is to fetch all the tuples of this query and save them in
+					* Tuple Slot. Since it is using PQSetChunkedRowsMode, we only get the
+					* fsstate->fetch_size tuples in one run, so keep on executing till we
+					* get NULL in PGresult i.e. all the tuples are retrieved.
+					*/
+					if (fsstate->conn_state->tuplestore)
+						MemoryContextSwitchTo(fsstate->conn_state->tupstore_context);
+					else
+					{
+						fsstate->conn_state->tupstore_context = AllocSetContextCreate(estate->es_query_cxt,
+																				"tupstore context",
+																				ALLOCSET_DEFAULT_SIZES);
+						MemoryContextSwitchTo(fsstate->conn_state->tupstore_context);
+						fsstate->conn_state->tuplestore = tuplestore_begin_heap(false, true, work_mem);
+						fsstate->conn_state->slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
+					}
+
+					i = 0;
+					for (;;)
+					{
+						CHECK_FOR_INTERRUPTS();
+						numrows = PQntuples(res);
+
+						/* Convert the data into HeapTuples */
+						Assert(IsA(node->ss.ps.plan, ForeignScan));
+						for (i = 0; i < numrows; i++)
+						{
+							temp_tuple =  make_tuple_from_result_row(res, i,
+														fsstate->rel,
+														fsstate->attinmeta,
+														fsstate->retrieved_attrs,
+														node,
+														fsstate->temp_cxt);
+							tuplestore_puttuple(fsstate->conn_state->tuplestore, temp_tuple);
+							fsstate->conn_state->total_tuples++;
+						}
+						pfree(temp_tuple);
+
+						res = pgfdw_get_next_result(conn);
+						if (res == NULL)
+							break;
+
+						else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+						{
+							while (res!= NULL)
+								res = pgfdw_get_result(conn);
+							break;
+						}
+						else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+							pgfdw_report_error(res, conn, fsstate->query);
+					}
+					MemoryContextSwitchTo(fsstate->batch_cxt);
+
+					/* EOF is reached because when we are storing all tuples to the tuplestore. */
+					fsstate->eof_reached = true;
+					already_done = true;
+					fsstate->conn_state->last_query = NULL;
+				}
+				else
+				{
+					/*
+					 *  In non-cursor mode, always copy the tuples into the TupleSlot because we never
+					 * know beforehand when another query will be fired up and then we might need these tuples
+					 */
+					if (fsstate->conn_state->tuplestore == NULL)
+					{
+						HeapTuple temp_tuple = (HeapTuple) palloc0(sizeof(HeapTuple));
+						fsstate->conn_state->tupstore_context = AllocSetContextCreate(estate->es_query_cxt,
+																				"tupstore context",
+																				ALLOCSET_DEFAULT_SIZES);
+						MemoryContextSwitchTo(fsstate->conn_state->tupstore_context);
+						fsstate->conn_state->tuplestore = tuplestore_begin_heap(false, true, work_mem);
+						fsstate->conn_state->slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
+						i = 0;
+						CHECK_FOR_INTERRUPTS();
+						numrows = PQntuples(res);
+
+						/* Convert the data into HeapTuples */
+						Assert(IsA(node->ss.ps.plan, ForeignScan));
+						for (i = 0; i < numrows; i++)
+						{
+							temp_tuple =  make_tuple_from_result_row(res, i,
+														fsstate->rel,
+														fsstate->attinmeta,
+														fsstate->retrieved_attrs,
+														node,
+														fsstate->temp_cxt);
+							tuplestore_puttuple(fsstate->conn_state->tuplestore, temp_tuple);
+							fsstate->conn_state->total_tuples++;
+						}
+						pfree(temp_tuple);
+						MemoryContextSwitchTo(fsstate->batch_cxt);
+					}
+				}
+			}
+		}
+	}
+	if (!already_done)
+	{
+		/* Convert the data into HeapTuples */
+		numrows = PQntuples(res);
+		fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+		fsstate->num_tuples = numrows;
+		fsstate->next_tuple = 0;
 
-		fsstate->tuples[i] =
-			make_tuple_from_result_row(res, i,
-									   fsstate->rel,
-									   fsstate->attinmeta,
-									   fsstate->retrieved_attrs,
-									   node,
-									   fsstate->temp_cxt);
+		for (i = 0; i < numrows; i++)
+		{
+			Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+			fsstate->tuples[i] =
+				make_tuple_from_result_row(res, i,
+										fsstate->rel,
+										fsstate->attinmeta,
+										fsstate->retrieved_attrs,
+										node,
+										fsstate->temp_cxt);
+		}
 	}
 
 	/* Update fetch_ct_2 */
@@ -3941,11 +4229,32 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
 	char		sql[64];
 	PGresult   *res;
 
-	snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
-	res = pgfdw_exec_query(conn, sql, conn_state);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
-		pgfdw_report_error(res, conn, sql);
-	PQclear(res);
+	if (pgfdw_use_cursor)
+	{
+		snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
+		res = pgfdw_exec_query(conn, sql, conn_state);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			pgfdw_report_error(res, conn, sql);
+		PQclear(res);
+	}
+	else
+	{
+		while (pgfdw_get_result(conn) != NULL) {}
+		if (conn_state)
+		{
+			conn_state->last_query = NULL;
+			conn_state->num_queries = 0;
+			conn_state->total_tuples = 0;
+			conn_state->last_query_processed = true;
+			conn_state->tuplestore = NULL;
+			MemoryContextReset(conn_state->tupstore_context);
+			if (conn_state->last_tupdesc)
+			{
+				ReleaseTupleDesc(conn_state->last_tupdesc);
+				conn_state->last_tupdesc = NULL;
+			}
+		}
+	}
 }
 
 /*
@@ -5239,7 +5548,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
 	for (;;)
 	{
 		int			numrows;
-		int			i;
+		int			i = 0;
 
 		/* Allow users to cancel long query */
 		CHECK_FOR_INTERRUPTS();
@@ -5306,7 +5615,7 @@ static void
 analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
 {
 	int			targrows = astate->targrows;
-	int			pos;			/* array index to store tuple in */
+	int			pos = 0;			/* array index to store tuple in */
 	MemoryContext oldcontext;
 
 	/* Always increment sample row counter. */
@@ -5338,7 +5647,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
 			Assert(pos >= 0 && pos < targrows);
 			heap_freetuple(astate->rows[pos]);
 		}
-		else
+		else if (pgfdw_use_cursor)
 		{
 			/* Skip this tuple. */
 			pos = -1;
@@ -7314,7 +7623,7 @@ postgresForeignAsyncNotify(AsyncRequest *areq)
 	if (!PQconsumeInput(fsstate->conn))
 		pgfdw_report_error(NULL, fsstate->conn, fsstate->query);
 
-	fetch_more_data(node);
+	fetch_more_data(node, false);
 
 	produce_tuple_asynchronously(areq, true);
 }
@@ -7432,7 +7741,7 @@ process_pending_request(AsyncRequest *areq)
 	/* The request should be currently in-process */
 	Assert(fsstate->conn_state->pendingAreq == areq);
 
-	fetch_more_data(node);
+	fetch_more_data(node, false);
 
 	/*
 	 * If we didn't get any tuples, must be end of data; complete the request
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..5cf04c2b875 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,9 +16,11 @@
 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq-be-fe.h"
+#include "libpq-int.h"
 #include "nodes/execnodes.h"
 #include "nodes/pathnodes.h"
 #include "utils/relcache.h"
+#include "funcapi.h"
 
 /*
  * FDW-specific planner information kept in RelOptInfo.fdw_private for a
@@ -137,6 +139,21 @@ typedef struct PgFdwRelationInfo
 typedef struct PgFdwConnState
 {
 	AsyncRequest *pendingAreq;	/* pending async request */
+	 /* Only to be used in the non cursor mode */
+	Tuplestorestate *tuplestore;
+	int  num_queries;
+	int cursor_number;
+	int total_tuples;
+	char *last_query;
+	bool last_query_processed;
+	MemoryContext tupstore_context;
+	Relation	last_rel;			/* relcache entry for the foreign table. NULL
+											for a foreign join scan. */
+	TupleDesc	last_tupdesc;		/* tuple descriptor of scan */
+	AttInMetadata *last_attinmeta;	/* attribute datatype conversion metadata */
+	TupleTableSlot *slot;
+	/* extracted fdw_private data */
+	List	   *last_retrieved_attrs;	/* list of retrieved attribute numbers */
 } PgFdwConnState;
 
 /*
@@ -164,6 +181,7 @@ extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern void do_sql_command(PGconn *conn, const char *sql);
 extern PGresult *pgfdw_get_result(PGconn *conn);
+extern PGresult *pgfdw_get_next_result(PGconn *conn);
 extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
 								  PgFdwConnState *state);
 pg_noreturn extern void pgfdw_report_error(PGresult *res, PGconn *conn,
@@ -179,6 +197,7 @@ extern List *ExtractExtensionList(const char *extensionsString,
 								  bool warnOnMissing);
 extern char *process_pgfdw_appname(const char *appname);
 extern char *pgfdw_application_name;
+extern bool pgfdw_use_cursor;
 
 /* in deparse.c */
 extern void classifyConditions(PlannerInfo *root,
@@ -258,5 +277,4 @@ extern const char *get_jointype_name(JoinType jointype);
 /* in shippable.c */
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
-
 #endif							/* POSTGRES_FDW_H */
-- 
2.39.5 (Apple Git-154)

