Hi,

Looking at the latest version of the tuplestore patch:


diff --git a/src/backend/utils/sort/sharedtuplestore.c 
b/src/backend/utils/sort/sharedtuplestore.c
new file mode 100644
index 00000000000..d1233221a58
--- /dev/null
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -0,0 +1,583 @@
+/*-------------------------------------------------------------------------
+ *
+ * sharedtuplestore.c
+ *       Simple mechanism for sharing tuples between backends.
+ *
+ * This module provides a shared temporary tuple storage mechanism, providing
+ * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
+ * can write to a SharedTuplestore, and then multiple backends can later scan
+ * the stored tuples.  Currently, the only scan type supported is a parallel
+ * scan where each backend reads an arbitrary subset of the tuples that were
+ * written.

Cool.


+/* Chunk written to disk. */
+typedef struct SharedTuplestoreChunk
+{
+       int                     ntuples;                /* Number of tuples in 
this chunk. */
+       bool            overflow;               /* Continuation of previous 
chunk? */
+       char            data[FLEXIBLE_ARRAY_MEMBER];
+} SharedTuplestoreChunk;

Ah. I was thinking we could have the 'overflow' variable be an int,
indicating the remaining length of the oversized tuple. That'd allow us
to skip ahead to the end of the oversized tuple in concurrent processes
after hitting it.



+/*
+ * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
+ * pointer to meta data of that size must be provided.
+ */
+void
+sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
+                        MinimalTuple tuple)
+{
+       size_t          size;
+
+       /* Do we have our own file yet? */
+       if (accessor->write_file == NULL)
+       {
+               SharedTuplestoreParticipant *participant;
+               char            name[MAXPGPATH];
+
+               /* Create one.  Only this backend will write into it. */
+               sts_filename(name, accessor, accessor->participant);
+               accessor->write_file = BufFileCreateShared(accessor->fileset, 
name);
+
+               /* Set up the shared state for this backend's file. */
+               participant = 
&accessor->sts->participants[accessor->participant];
+               participant->writing = true;    /* for assertions only */
+       }
+
+       /* Do we have space? */
+       size = accessor->sts->meta_data_size + tuple->t_len;
+       if (accessor->write_pointer + size >= accessor->write_end)
+       {
+               if (accessor->write_chunk == NULL)
+               {
+                       /* First time through.  Allocate chunk. */
+                       accessor->write_chunk = (SharedTuplestoreChunk *)
+                               MemoryContextAllocZero(accessor->context,
+                                                                          
STS_CHUNK_PAGES * BLCKSZ);
+                       accessor->write_chunk->ntuples = 0;
+                       accessor->write_pointer = 
&accessor->write_chunk->data[0];
+                       accessor->write_end = (char *)
+                               accessor->write_chunk + STS_CHUNK_PAGES * 
BLCKSZ;
+               }
+               else
+               {
+                       /* See if flushing helps. */
+                       sts_flush_chunk(accessor);
+               }
+
+               /* It may still not be enough in the case of a gigantic tuple. 
*/
+               if (accessor->write_pointer + size >= accessor->write_end)
+               {
+                       size_t          written;
+
+                       /*
+                        * We'll write the beginning of the oversized tuple, 
and then
+                        * write the rest in some number of 'overflow' chunks.
+                        */
+                       if (accessor->write_pointer + 
accessor->sts->meta_data_size >=
+                               accessor->write_end)
+                               elog(ERROR, "meta-data too long");

That seems more like an Assert than a proper elog? Given that we're
calculating size just a few lines above...


+                       if (accessor->sts->meta_data_size > 0)
+                               memcpy(accessor->write_pointer, meta_data,
+                                          accessor->sts->meta_data_size);
+                       written = accessor->write_end - accessor->write_pointer 
-
+                               accessor->sts->meta_data_size;
+                       memcpy(accessor->write_pointer + 
accessor->sts->meta_data_size,
+                                  tuple, written);

Also, shouldn't the same Assert() be here as well if you have it above?

+static MinimalTuple
+sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+       MinimalTuple tuple;
+       uint32          size;
+       size_t          remaining_size;
+       size_t          this_chunk_size;
+       char       *destination;
+
+       /*
+        * We'll keep track of bytes read from this chunk so that we can detect 
an
+        * overflowing tuples and switch to reading overflow pages.
+        */
+       if (accessor->sts->meta_data_size > 0)
+       {
+               if (BufFileRead(accessor->read_file,
+                                               meta_data,
+                                               accessor->sts->meta_data_size) 
!=
+                       accessor->sts->meta_data_size)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read from shared 
tuplestore temporary file"),
+                                        errdetail("Short read while reading 
meta-data")));

The errdetail doesn't follow the style guide (not a sentence ending with
.), and seems internal-ish. I'm ok with keeping it, but perhaps we
should change all these to be errdetail_internal()? Seems pointless to
translate all of them.

+MinimalTuple
+sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
+{
+       SharedTuplestoreParticipant *p;
+       BlockNumber     read_page;
+       bool            eof;
+
+       for (;;)
+       {
+               /* Can we read more tuples from the current chunk? */
+               if (accessor->read_ntuples < accessor->read_ntuples_available)
+                       return sts_read_tuple(accessor, meta_data);
+
+               /* Find the location of a new chunk to read. */
+               p = &accessor->sts->participants[accessor->read_participant];
+
+               LWLockAcquire(&p->lock, LW_EXCLUSIVE);
+               eof = p->read_page >= p->npages;
+               if (!eof)
+               {
+                       read_page = p->read_page;
+                       p->read_page += STS_CHUNK_PAGES;
+               }
+               LWLockRelease(&p->lock);

So if we went to the world I'm suggesting, with overflow containing the
length till the end of the tuple, this'd probably would have to look a
bit different.


+               if (!eof)
+               {
+                       SharedTuplestoreChunk chunk_header;
+
+                       /* Make sure we have the file open. */
+                       if (accessor->read_file == NULL)
+                       {
+                               char            name[MAXPGPATH];
+
+                               sts_filename(name, accessor, 
accessor->read_participant);
+                               accessor->read_file =
+                                       BufFileOpenShared(accessor->fileset, 
name);
+                               if (accessor->read_file == NULL)
+                                       elog(ERROR, "could not open temporary 
file %s", name);

Isn't this more an Assert or just not anything? There's now way
BufFileOpenShared should ever return NULL, no?

+
+                       /* Seek and load the chunk header. */
+                       if (BufFileSeekBlock(accessor->read_file, read_page) != 
0)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not read from 
shared tuplestore temporary file"),
+                                                errdetail("Could not seek to 
next block")));
+                       if (BufFileRead(accessor->read_file, &chunk_header,
+                                                       
offsetof(SharedTuplestoreChunk, data)) !=
+                               offsetof(SharedTuplestoreChunk, data))
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not read from 
shared tuplestore temporary file"),
+                                                errdetail("Short read while 
reading chunk header")));
+
+                       /* If this is an overflow chunk, we skip it. */
+                       if (chunk_header.overflow)
+                               continue;
+
+                       accessor->read_ntuples = 0;
+                       accessor->read_ntuples_available = chunk_header.ntuples;
+                       accessor->read_bytes = offsetof(SharedTuplestoreChunk, 
data);

Perhaps somewhere around here comment that we'll just loop around and
call sts_read_tuple() in the next loop iteration?


Greetings,

Andres Freund

Reply via email to