Hi,

On 2020-06-03 15:53:24 +0530, vignesh C wrote:
> Workers/
> Exec time (seconds) copy from file,
> 2 indexes on integer columns
> 1 index on text column copy from stdin,
> 2 indexes on integer columns
> 1 index on text column copy from file, 1 gist index on text column copy
> from file,
> 3 indexes on integer columns copy from stdin, 3 indexes on integer columns
> 0 1162.772(1X) 1176.035(1X) 827.669(1X) 216.171(1X) 217.376(1X)
> 1 1110.288(1.05X) 1120.556(1.05X) 747.384(1.11X) 174.242(1.24X) 163.492(1.33X)
> 2 635.249(1.83X) 668.18(1.76X) 435.673(1.9X) 133.829(1.61X) 126.516(1.72X)
> 4 336.835(3.45X) 346.768(3.39X) 236.406(3.5X) 105.767(2.04X) 107.382(2.02X)
> 8 188.577(6.17X) 194.491(6.04X) 148.962(5.56X) 100.708(2.15X) 107.72(2.01X)
> 16 126.819(9.17X) 146.402(8.03X) 119.923(6.9X) 97.996(2.2X) 106.531(2.04X)
> 20 *117.845(9.87X)* 149.203(7.88X) 138.741(5.96X) 97.94(2.21X) 107.5(2.02)
> 30 127.554(9.11X) 161.218(7.29X) 172.443(4.8X) 98.232(2.2X) 108.778(1.99X)

Hm. you don't explicitly mention that in your design, but given how
small the benefits going from 0-1 workers is, I assume the leader
doesn't do any "chunk processing" on its own?


> Design of the Parallel Copy: The backend, to which the "COPY FROM" query is
> submitted acts as leader with the responsibility of reading data from the
> file/stdin, launching at most n number of workers as specified with
> PARALLEL 'n' option in the "COPY FROM" query. The leader populates the
> common data required for the workers execution in the DSM and shares it
> with the workers. The leader then executes before statement triggers if
> there exists any. Leader populates DSM chunks which includes the start
> offset and chunk size, while populating the chunks it reads as many blocks
> as required into the DSM data blocks from the file. Each block is of 64K
> size. The leader parses the data to identify a chunk, the existing logic
> from CopyReadLineText which identifies the chunks with some changes was
> used for this. Leader checks if a free chunk is available to copy the
> information, if there is no free chunk it waits till the required chunk is
> freed up by the worker and then copies the identified chunks information
> (offset & chunk size) into the DSM chunks. This process is repeated till
> the complete file is processed. Simultaneously, the workers cache the
> chunks(50) locally into the local memory and release the chunks to the
> leader for further populating. Each worker processes the chunk that it
> cached and inserts it into the table. The leader waits till all the chunks
> populated are processed by the workers and exits.

Why do we need the local copy of 50 chunks? Copying memory around is far
from free. I don't see why it'd be better to add per-process caching,
rather than making the DSM bigger? I can see some benefit in marking
multiple chunks as being processed with one lock acquisition, but I
don't think adding a memory copy is a good idea.


This patch *desperately* needs to be split up. It imo is close to
unreviewable, due to a large amount of changes that just move code
around without other functional changes being mixed in with the actual
new stuff.


>  /*
> + * State of the chunk.
> + */
> +typedef enum ChunkState
> +{
> +     CHUNK_INIT,                                     /* initial state of 
> chunk */
> +     CHUNK_LEADER_POPULATING,        /* leader processing chunk */
> +     CHUNK_LEADER_POPULATED,         /* leader completed populating chunk */
> +     CHUNK_WORKER_PROCESSING,        /* worker processing chunk */
> +     CHUNK_WORKER_PROCESSED          /* worker completed processing chunk */
> +}ChunkState;
> +
> +#define RAW_BUF_SIZE 65536           /* we palloc RAW_BUF_SIZE+1 bytes */
> +
> +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> +#define RINGSIZE (10 * 1000)
> +#define MAX_BLOCKS_COUNT 1000
> +#define WORKER_CHUNK_COUNT 50        /* should be mod of RINGSIZE */
> +
> +#define      IsParallelCopy()                (cstate->is_parallel)
> +#define IsLeader()                           (cstate->pcdata->is_leader)
> +#define IsHeaderLine()                       (cstate->header_line && 
> cstate->cur_lineno == 1)
> +
> +/*
> + * Copy data block information.
> + */
> +typedef struct CopyDataBlock
> +{
> +     /* The number of unprocessed chunks in the current block. */
> +     pg_atomic_uint32 unprocessed_chunk_parts;
> +
> +     /*
> +      * If the current chunk data is continued into another block,
> +      * following_block will have the position where the remaining data need 
> to
> +      * be read.
> +      */
> +     uint32  following_block;
> +
> +     /*
> +      * This flag will be set, when the leader finds out this block can be 
> read
> +      * safely by the worker. This helps the worker to start processing the 
> chunk
> +      * early where the chunk will be spread across many blocks and the 
> worker
> +      * need not wait for the complete chunk to be processed.
> +      */
> +     bool   curr_blk_completed;
> +     char   data[DATA_BLOCK_SIZE + 1]; /* data read from file */
> +}CopyDataBlock;

What's the + 1 here about?


> +/*
> + * Parallel copy line buffer information.
> + */
> +typedef struct ParallelCopyLineBuf
> +{
> +     StringInfoData          line_buf;
> +     uint64                          cur_lineno;     /* line number for 
> error messages */
> +}ParallelCopyLineBuf;

Why do we need separate infrastructure for this? We shouldn't duplicate
infrastructure unnecessarily.



> +/*
> + * Common information that need to be copied to shared memory.
> + */
> +typedef struct CopyWorkerCommonData
> +{

Why is parallel specific stuff here suddenly not named ParallelCopy*
anymore? If you introduce a naming like that it imo should be used
consistently.

> +     /* low-level state data */
> +     CopyDest            copy_dest;          /* type of copy 
> source/destination */
> +     int                 file_encoding;      /* file or remote side's 
> character encoding */
> +     bool                need_transcoding;   /* file encoding diff from 
> server? */
> +     bool                encoding_embeds_ascii;      /* ASCII can be 
> non-first byte? */
> +
> +     /* parameters from the COPY command */
> +     bool                csv_mode;           /* Comma Separated Value 
> format? */
> +     bool                header_line;        /* CSV header line? */
> +     int                 null_print_len; /* length of same */
> +     bool                force_quote_all;    /* FORCE_QUOTE *? */
> +     bool                convert_selectively;        /* do selective binary 
> conversion? */
> +
> +     /* Working state for COPY FROM */
> +     AttrNumber          num_defaults;
> +     Oid                 relid;
> +}CopyWorkerCommonData;

But I actually think we shouldn't have this information in two different
structs. This should exist once, independent of using parallel /
non-parallel copy.


> +/* List information */
> +typedef struct ListInfo
> +{
> +     int     count;          /* count of attributes */
> +
> +     /* string info in the form info followed by info1, info2... infon  */
> +     char    info[1];
> +} ListInfo;

Based on these comments I have no idea what this could be for.


>  /*
> - * This keeps the character read at the top of the loop in the buffer
> - * even if there is more than one read-ahead.
> + * This keeps the character read at the top of the loop in the buffer
> + * even if there is more than one read-ahead.
> + */
> +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
> +if (1) \
> +{ \
> +     if (copy_buff_state.raw_buf_ptr + (extralen) >= 
> copy_buff_state.copy_buf_len && !hit_eof) \
> +     { \
> +             if (IsParallelCopy()) \
> +             { \
> +                     copy_buff_state.chunk_size = prev_chunk_size; /* update 
> previous chunk size */ \
> +                     if (copy_buff_state.block_switched) \
> +                     { \
> +                             
> pg_atomic_sub_fetch_u32(&copy_buff_state.data_blk_ptr->unprocessed_chunk_parts,
>  1); \
> +                             copy_buff_state.copy_buf_len = 
> prev_copy_buf_len; \
> +                     } \
> +             } \
> +             copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
> +             need_data = true; \
> +             continue; \
> +     } \
> +} else ((void) 0)

I think it's an absolutely clear no-go to add new branches to
these. They're *really* hot already, and this is going to sprinkle a
significant amount of new instructions over a lot of places.


> +/*
> + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file 
> data must
> + * be read.
> + */
> +#define SET_RAWBUF_FOR_LOAD() \
> +{ \
> +     ShmCopyInfo     *pcshared_info = cstate->pcdata->pcshared_info; \
> +     uint32 cur_block_pos; \
> +     /* \
> +      * Mark the previous block as completed, worker can start copying this 
> data. \
> +      */ \
> +     if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr 
> && \
> +             copy_buff_state.data_blk_ptr->curr_blk_completed == false) \
> +             copy_buff_state.data_blk_ptr->curr_blk_completed = true; \
> +     \
> +     copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> +     cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \
> +     copy_buff_state.curr_data_blk_ptr = 
> &pcshared_info->data_blocks[cur_block_pos]; \
> +     \
> +     if (!copy_buff_state.data_blk_ptr) \
> +     { \
> +             copy_buff_state.data_blk_ptr = 
> copy_buff_state.curr_data_blk_ptr; \
> +             chunk_first_block = cur_block_pos; \
> +     } \
> +     else if (need_data == false) \
> +             copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \
> +     \
> +     cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \
> +     copy_buff_state.copy_raw_buf = cstate->raw_buf; \
> +}
> +
> +/*
> + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory.
> + */
> +#define END_CHUNK_PARALLEL_COPY() \
> +{ \
> +     if (!IsHeaderLine()) \
> +     { \
> +             ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> +             ChunkBoundaries *chunkBoundaryPtr = 
> &pcshared_info->chunk_boundaries; \
> +             if (copy_buff_state.chunk_size) \
> +             { \
> +                     ChunkBoundary *chunkInfo = 
> &chunkBoundaryPtr->ring[chunk_pos]; \
> +                     /* \
> +                      * If raw_buf_ptr is zero, unprocessed_chunk_parts 
> would have been \
> +                      * incremented in SEEK_COPY_BUFF_POS. This will happen 
> if the whole \
> +                      * chunk finishes at the end of the current block. If 
> the \
> +                      * new_line_size > raw_buf_ptr, then the new block has 
> only new line \
> +                      * char content. The unprocessed count should not be 
> increased in \
> +                      * this case. \
> +                      */ \
> +                     if (copy_buff_state.raw_buf_ptr != 0 && \
> +                             copy_buff_state.raw_buf_ptr > new_line_size) \
> +                             
> pg_atomic_add_fetch_u32(&copy_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts,
>  1); \
> +                     \
> +                     /* Update chunk size. */ \
> +                     pg_atomic_write_u32(&chunkInfo->chunk_size, 
> copy_buff_state.chunk_size); \
> +                     pg_atomic_write_u32(&chunkInfo->chunk_state, 
> CHUNK_LEADER_POPULATED); \
> +                     elog(DEBUG1, "[Leader] After adding - chunk 
> position:%d, chunk_size:%d", \
> +                                             chunk_pos, 
> copy_buff_state.chunk_size); \
> +                     pcshared_info->populated++; \
> +             } \
> +             else if (new_line_size) \
> +             { \
> +                     /* \
> +                      * This means only new line char, empty record should 
> be \
> +                      * inserted. \
> +                      */ \
> +                     ChunkBoundary *chunkInfo; \
> +                     chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \
> +                                                                             
>            CHUNK_LEADER_POPULATED); \
> +                     chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> +                     elog(DEBUG1, "[Leader] Added empty chunk with 
> offset:%d, chunk position:%d, chunk size:%d", \
> +                                              chunkInfo->start_offset, 
> chunk_pos, \
> +                                              
> pg_atomic_read_u32(&chunkInfo->chunk_size)); \
> +                     pcshared_info->populated++; \
> +             } \
> +     }\
> +     \
> +     /*\
> +      * All of the read data is processed, reset index & len. In the\
> +      * subsequent read, we will get a new block and copy data in to the\
> +      * new block.\
> +      */\
> +     if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\
> +     {\
> +             cstate->raw_buf_index = 0;\
> +             cstate->raw_buf_len = 0;\
> +     }\
> +     else\
> +             cstate->raw_buf_len = copy_buff_state.copy_buf_len;\
> +}

Why are these macros? They are way way way above a length where that
makes any sort of sense.


Greetings,

Andres Freund


Reply via email to