On Tue, Mar 28, 2017 at 10:00 AM, Tomas Vondra <[email protected]
> wrote:
>
>
> On 03/27/2017 01:40 PM, Rushabh Lathia wrote:
>
>>
>> ...
>> I was doing more testing with the patch and I found one more server
>> crash with the patch around same area, when we forced the gather
>> merge for the scan having zero rows.
>>
>> create table dept ( deptno numeric, dname varchar(20);
>> set parallel_tuple_cost =0;
>> set parallel_setup_cost =0;
>> set min_parallel_table_scan_size =0;
>> set min_parallel_index_scan_size =0;
>> set force_parallel_mode=regress;
>> explain analyze select * from dept order by deptno;
>>
>> This is because for leader we don't initialize the slot into gm_slots. So
>> in case where launched worker is zero and table having zero rows, we
>> end up having NULL slot into gm_slots array.
>>
>> Currently gather_merge_clear_slots() clear out the tuple table slots for
>> each
>> gather merge input and returns clear slot. In the patch I modified
>> function
>> gather_merge_clear_slots() to just clear out the tuple table slots and
>> always return NULL when All the queues and heap us exhausted.
>>
>>
> Isn't that just another sign the code might be a bit too confusing? I see
> two main issues in the code:
>
> 1) allocating 'slots' as 'nreaders+1' elements, which seems like a good
> way to cause off-by-one errors
>
> 2) mixing objects with different life spans (slots for readers vs. slot
> for the leader) seems like a bad idea too
>
> I wonder how much we gain by reusing the slot from the leader (I'd be
> surprised if it was at all measurable). David posted a patch reworking
> this, and significantly simplifying the GatherMerge node. Why not to accept
> that?
>
>
>
I think we all agree that we should get rid of nreaders from the
GatherMergeState
and need to do some code re-factor. But if I understood correctly that
Robert's
concern was to do that re-factor as separate commit.
I picked David's patch and started reviewing the changes. I applied that
patch
on top of my v2 patch (which does the re-factor of
gather_merge_clear_slots).
In David's patch, into gather_merge_init(), a loop where tuple array is
getting
allocate, that loop need to only up to nworkers_launched. Because we don't
hold the tuple array for leader. I changed that and did some other simple
changes based on mine v2 patch. I also performed manual testing with
the changes.
Please find attached re-factor patch, which is v2 patch submitted for the
server crash fix. (Attaching both the patch here again, for easy of access).
Thanks,
--
Rushabh Lathia
www.EnterpriseDB.com
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 62c399e..2d7eb71 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -195,9 +195,9 @@ ExecGatherMerge(GatherMergeState *node)
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0)
{
- node->nreaders = 0;
- node->reader = palloc(pcxt->nworkers_launched *
- sizeof(TupleQueueReader *));
+ node->reader = (TupleQueueReader **)
+ palloc(pcxt->nworkers_launched *
+ sizeof(TupleQueueReader *));
Assert(gm->numCols);
@@ -205,7 +205,7 @@ ExecGatherMerge(GatherMergeState *node)
{
shm_mq_set_handle(node->pei->tqueue[i],
pcxt->worker[i].bgwhandle);
- node->reader[node->nreaders++] =
+ node->reader[i] =
CreateTupleQueueReader(node->pei->tqueue[i],
node->tupDesc);
}
@@ -298,7 +298,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
{
int i;
- for (i = 0; i < node->nreaders; ++i)
+ for (i = 0; i < node->nworkers_launched; ++i)
if (node->reader[i])
DestroyTupleQueueReader(node->reader[i]);
@@ -344,28 +344,26 @@ ExecReScanGatherMerge(GatherMergeState *node)
static void
gather_merge_init(GatherMergeState *gm_state)
{
- int nreaders = gm_state->nreaders;
+ int nslots = gm_state->nworkers_launched + 1;
bool initialize = true;
int i;
/*
* Allocate gm_slots for the number of worker + one more slot for leader.
- * Last slot is always for leader. Leader always calls ExecProcNode() to
- * read the tuple which will return the TupleTableSlot. Later it will
- * directly get assigned to gm_slot. So just initialize leader gm_slot
- * with NULL. For other slots below code will call
- * ExecInitExtraTupleSlot() which will do the initialization of worker
- * slots.
+ * The final slot in the array is reserved for the leader process. This
+ * slot is always populated via ExecProcNode(). This can be set to NULL
+ * for now. The remaining slots we'll initialize with a call to
+ * ExecInitExtraTupleSlot().
*/
- gm_state->gm_slots =
- palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *));
- gm_state->gm_slots[gm_state->nreaders] = NULL;
+ gm_state->gm_slots = (TupleTableSlot **)
+ palloc(nslots * sizeof(TupleTableSlot *));
+ gm_state->gm_slots[nslots - 1] = NULL; /* nullify leader's slot */
- /* Initialize the tuple slot and tuple array for each worker */
+ /* Initialize the tuple slot and tuple array for each reader */
gm_state->gm_tuple_buffers =
- (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) *
- (gm_state->nreaders + 1));
- for (i = 0; i < gm_state->nreaders; i++)
+ (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * nslots);
+
+ for (i = 0; i < gm_state->nworkers_launched; i++)
{
/* Allocate the tuple array with MAX_TUPLE_STORE size */
gm_state->gm_tuple_buffers[i].tuple =
@@ -378,7 +376,7 @@ gather_merge_init(GatherMergeState *gm_state)
}
/* Allocate the resources for the merge */
- gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1,
+ gm_state->gm_heap = binaryheap_allocate(nslots,
heap_compare_slots,
gm_state);
@@ -388,10 +386,10 @@ gather_merge_init(GatherMergeState *gm_state)
* leader. After this, if all active workers are unable to produce a
* tuple, then re-read and this time use wait mode. For workers that were
* able to produce a tuple in the earlier loop and are still active, just
- * try to fill the tuple array if more tuples are avaiable.
+ * try to fill the tuple array if more tuples are available.
*/
reread:
- for (i = 0; i < nreaders + 1; i++)
+ for (i = 0; i < nslots; i++)
{
if (!gm_state->gm_tuple_buffers[i].done &&
(TupIsNull(gm_state->gm_slots[i]) ||
@@ -408,7 +406,7 @@ reread:
}
initialize = false;
- for (i = 0; i < nreaders; i++)
+ for (i = 0; i < nslots; i++)
if (!gm_state->gm_tuple_buffers[i].done &&
(TupIsNull(gm_state->gm_slots[i]) ||
gm_state->gm_slots[i]->tts_isempty))
@@ -419,14 +417,14 @@ reread:
}
/*
- * Clear out the tuple table slots for each gather merge input.
+ * Clear out the tuple table slots for each gather merge workers.
*/
static void
gather_merge_clear_slots(GatherMergeState *gm_state)
{
int i;
- for (i = 0; i < gm_state->nreaders; i++)
+ for (i = 0; i < gm_state->nworkers_launched; i++)
{
pfree(gm_state->gm_tuple_buffers[i].tuple);
gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]);
@@ -492,13 +490,15 @@ gather_merge_getnext(GatherMergeState *gm_state)
static void
form_tuple_array(GatherMergeState *gm_state, int reader)
{
- GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+ GMReaderTupleBuffer *tuple_buffer;
int i;
/* Last slot is for leader and we don't build tuple array for leader */
- if (reader == gm_state->nreaders)
+ if (reader == gm_state->nworkers_launched)
return;
+ tuple_buffer = &gm_state->gm_tuple_buffers[reader];
+
/*
* We here because we already read all the tuples from the tuple array, so
* initialize the counter to zero.
@@ -537,7 +537,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
* If we're being asked to generate a tuple from the leader, then we
* just call ExecProcNode as normal to produce one.
*/
- if (gm_state->nreaders == reader)
+ if (gm_state->nworkers_launched == reader)
{
if (gm_state->need_to_scan_locally)
{
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 11a6850..e8c08c6 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1864,18 +1864,20 @@ typedef struct GatherMergeState
PlanState ps; /* its first field is NodeTag */
bool initialized;
struct ParallelExecutorInfo *pei;
- int nreaders;
- int nworkers_launched;
- struct TupleQueueReader **reader;
+ int nworkers_launched; /* number of parallel workers launched */
+ struct TupleQueueReader **reader; /* array of readers, nworkers_launched
+ * long */
TupleDesc tupDesc;
- TupleTableSlot **gm_slots;
- struct binaryheap *gm_heap; /* binary heap of slot indices */
+ TupleTableSlot **gm_slots; /* array of Tuple slots, nworkers_launched + 1
+ * long */
+ struct binaryheap *gm_heap; /* binary heap of slot indices */
bool gm_initialized; /* gather merge initilized ? */
bool need_to_scan_locally;
int gm_nkeys;
SortSupport gm_sortkeys; /* array of length ms_nkeys */
- struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per
- * reader */
+ struct GMReaderTupleBuffer *gm_tuple_buffers; /* array of tuple buffers,
+ * nworkers_launched + 1
+ * long */
} GatherMergeState;
/* ----------------
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 3f0c3ee..62c399e 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -419,10 +419,9 @@ reread:
}
/*
- * Clear out the tuple table slots for each gather merge input,
- * and return a cleared slot.
+ * Clear out the tuple table slots for each gather merge input.
*/
-static TupleTableSlot *
+static void
gather_merge_clear_slots(GatherMergeState *gm_state)
{
int i;
@@ -437,9 +436,6 @@ gather_merge_clear_slots(GatherMergeState *gm_state)
pfree(gm_state->gm_tuple_buffers);
/* Free the binaryheap, which was created for sort */
binaryheap_free(gm_state->gm_heap);
-
- /* return any clear slot */
- return gm_state->gm_slots[0];
}
/*
@@ -479,7 +475,8 @@ gather_merge_getnext(GatherMergeState *gm_state)
if (binaryheap_empty(gm_state->gm_heap))
{
/* All the queues are exhausted, and so is the heap */
- return gather_merge_clear_slots(gm_state);
+ gather_merge_clear_slots(gm_state);
+ return NULL;
}
else
{
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers