Hi all. A brief overview of our use case follows.

We are developing a foreign data wrapper which employs parallel scan
support and predicate pushdown; given the types of queries we run,
foreign scans can be very long and often return very few rows.

As the scan can be very long and slow, we'd like to provide partial
results to the user as rows are being returned. We found two problems
with that:
1. Leader backend would not poll the parallel workers queue until it
itself found a row to return; we worked around it by turning
`parallel_leader_participation` to off.
2. Parallel workers tuple queues have buffering, and are not flushed
until a certain fill threshold is reached; as our queries yield few
result rows, oftentimes these rows would only get returned at the end
of the (very long) scan.

The proposal is to add a `parallel_tuplequeue_autoflush` GUC (bool,
default false) that would force every row returned by a parallel
worker to be immediately flushed to the leader; this was already the
case before v15, so it simply allows to opt for the previous
behaviour.

This would be achieved by configuring a `auto_flush` field on
`TQueueDestReceiver`, so that `tqueueReceiveSlot` would pass
`force_flush` when calling `shm_mq_send`.

The attached patch, tested on master @ 1ab67c9dfaadda , is a poc
tentative implementation.
Based on feedback, we're available to work on a complete and properly
documented patch.

Thanks in advance for your consideration.

Regards,
Francesco
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index bfb3419efb..77b3cdbfbb 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -48,6 +48,7 @@
 #include "utils/dsa.h"
 #include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
+#include "optimizer/optimizer.h"
 
 /*
  * Magic numbers for parallel executor communication.  We use constants
@@ -136,7 +137,7 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
 												SharedExecutorInstrumentation *instrumentation);
 
 /* Helper function that runs in the parallel worker. */
-static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
+static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc, bool auto_flush);
 
 /*
  * Create a serialized representation of the plan to be sent to each worker.
@@ -1220,7 +1221,7 @@ ExecParallelCleanup(ParallelExecutorInfo *pei)
  * for that purpose.
  */
 static DestReceiver *
-ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
+ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc, bool auto_flush)
 {
 	char	   *mqspace;
 	shm_mq	   *mq;
@@ -1229,7 +1230,7 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
 	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
 	mq = (shm_mq *) mqspace;
 	shm_mq_set_sender(mq, MyProc);
-	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
+	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL), auto_flush);
 }
 
 /*
@@ -1417,8 +1418,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	/* Get fixed-size state. */
 	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
 
+
 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
-	receiver = ExecParallelGetReceiver(seg, toc);
+	receiver = ExecParallelGetReceiver(seg, toc, parallel_tuplequeue_autoflush);
 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
 	if (instrumentation != NULL)
 		instrument_options = instrumentation->instrument_options;
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index ae3ef69ca9..f54f912e9e 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -31,6 +31,7 @@ typedef struct TQueueDestReceiver
 {
 	DestReceiver pub;			/* public fields */
 	shm_mq_handle *queue;		/* shm_mq to send to */
+	bool auto_flush;
 } TQueueDestReceiver;
 
 /*
@@ -60,7 +61,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the tuple itself. */
 	tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
-	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
+	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, tqueue->auto_flush);
 
 	if (should_free)
 		pfree(tuple);
@@ -116,7 +117,7 @@ tqueueDestroyReceiver(DestReceiver *self)
  * Create a DestReceiver that writes tuples to a tuple queue.
  */
 DestReceiver *
-CreateTupleQueueDestReceiver(shm_mq_handle *handle)
+CreateTupleQueueDestReceiver(shm_mq_handle *handle, bool auto_flush)
 {
 	TQueueDestReceiver *self;
 
@@ -128,6 +129,7 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
 	self->pub.rDestroy = tqueueDestroyReceiver;
 	self->pub.mydest = DestTupleQueue;
 	self->queue = handle;
+	self->auto_flush = auto_flush;
 
 	return (DestReceiver *) self;
 }
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index d92d43a17e..0dfb9ca471 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -67,6 +67,7 @@
 double		cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION;
 int			debug_parallel_query = DEBUG_PARALLEL_OFF;
 bool		parallel_leader_participation = true;
+bool		parallel_tuplequeue_autoflush = false;
 
 /* Hook for plugins to get control in planner() */
 planner_hook_type planner_hook = NULL;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 96f80b3046..a0cd5e888c 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -151,7 +151,7 @@ CreateDestReceiver(CommandDest dest)
 			return CreateTransientRelDestReceiver(InvalidOid);
 
 		case DestTupleQueue:
-			return CreateTupleQueueDestReceiver(NULL);
+			return CreateTupleQueueDestReceiver(NULL, false);
 
 		case DestExplainSerialize:
 			return CreateExplainSerializeDestReceiver(NULL);
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 686309db58..dc432ef249 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -1961,6 +1961,17 @@ struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
+ 	{
+ 		{"parallel_tuplequeue_autoflush", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
+ 			gettext_noop("Controls whether parallel tuple queues auto flush on tuple send."),
+ 			gettext_noop("Should parallel tuple queues auto flush on tuple send?"),
+ 			GUC_EXPLAIN
+ 		},
+ 		&parallel_tuplequeue_autoflush,
+ 		false,
+ 		NULL, NULL, NULL
+ 	},
+
 	{
 		{"jit", PGC_USERSET, QUERY_TUNING_OTHER,
 			gettext_noop("Allow JIT compilation."),
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 093a6b8875..b0d41a9a1a 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -21,7 +21,7 @@
 typedef struct TupleQueueReader TupleQueueReader;
 
 /* Use this to send tuples to a shm_mq. */
-extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
+extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle, bool auto_flush);
 
 /* Use these to receive tuples from a shm_mq. */
 extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle);
diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h
index 93e3dc719d..b7b184abd1 100644
--- a/src/include/optimizer/optimizer.h
+++ b/src/include/optimizer/optimizer.h
@@ -111,6 +111,7 @@ typedef enum
 /* GUC parameters */
 extern PGDLLIMPORT int debug_parallel_query;
 extern PGDLLIMPORT bool parallel_leader_participation;
+extern PGDLLIMPORT bool parallel_tuplequeue_autoflush;
 
 extern struct PlannedStmt *planner(Query *parse, const char *query_string,
 								   int cursorOptions,

Reply via email to