On Thu, Jun 4, 2020 at 12:44 AM Andres Freund <and...@anarazel.de> wrote
>
>
> Hm. you don't explicitly mention that in your design, but given how
> small the benefits going from 0-1 workers is, I assume the leader
> doesn't do any "chunk processing" on its own?
>

Yes you are right, the leader does not do any processing, Leader's
work is mainly to populate the shared memory with the offset
information for each record.

>
>
> > Design of the Parallel Copy: The backend, to which the "COPY FROM" query is
> > submitted acts as leader with the responsibility of reading data from the
> > file/stdin, launching at most n number of workers as specified with
> > PARALLEL 'n' option in the "COPY FROM" query. The leader populates the
> > common data required for the workers execution in the DSM and shares it
> > with the workers. The leader then executes before statement triggers if
> > there exists any. Leader populates DSM chunks which includes the start
> > offset and chunk size, while populating the chunks it reads as many blocks
> > as required into the DSM data blocks from the file. Each block is of 64K
> > size. The leader parses the data to identify a chunk, the existing logic
> > from CopyReadLineText which identifies the chunks with some changes was
> > used for this. Leader checks if a free chunk is available to copy the
> > information, if there is no free chunk it waits till the required chunk is
> > freed up by the worker and then copies the identified chunks information
> > (offset & chunk size) into the DSM chunks. This process is repeated till
> > the complete file is processed. Simultaneously, the workers cache the
> > chunks(50) locally into the local memory and release the chunks to the
> > leader for further populating. Each worker processes the chunk that it
> > cached and inserts it into the table. The leader waits till all the chunks
> > populated are processed by the workers and exits.
>
> Why do we need the local copy of 50 chunks? Copying memory around is far
> from free. I don't see why it'd be better to add per-process caching,
> rather than making the DSM bigger? I can see some benefit in marking
> multiple chunks as being processed with one lock acquisition, but I
> don't think adding a memory copy is a good idea.

We had run performance with  csv data file, 5.1GB, 10million tuples, 2
indexes on integer columns, results for the same are given below. We
noticed in some cases the performance is better if we copy the 50
records locally and release the shared memory. We will get better
benefits as the workers increase. Thoughts?
------------------------------------------------------------------------------------------------
Workers  | Exec time (With local copying | Exec time (Without copying,
               | 50 records & release the         | processing record by record)
               | shared memory)                      |
------------------------------------------------------------------------------------------------
0             |   1162.772(1X)                        |       1152.684(1X)
2             |   635.249(1.83X)                     |       647.894(1.78X)
4             |   336.835(3.45X)                     |       335.534(3.43X)
8             |   188.577(6.17 X)                    |       189.461(6.08X)
16           |   126.819(9.17X)                     |       142.730(8.07X)
20           |   117.845(9.87X)                     |       146.533(7.87X)
30           |   127.554(9.11X)                     |       160.307(7.19X)

> This patch *desperately* needs to be split up. It imo is close to
> unreviewable, due to a large amount of changes that just move code
> around without other functional changes being mixed in with the actual
> new stuff.

I have split the patch, the new split patches are attached.

>
>
>
> >  /*
> > + * State of the chunk.
> > + */
> > +typedef enum ChunkState
> > +{
> > +     CHUNK_INIT,                                     /* initial state of 
> > chunk */
> > +     CHUNK_LEADER_POPULATING,        /* leader processing chunk */
> > +     CHUNK_LEADER_POPULATED,         /* leader completed populating chunk 
> > */
> > +     CHUNK_WORKER_PROCESSING,        /* worker processing chunk */
> > +     CHUNK_WORKER_PROCESSED          /* worker completed processing chunk 
> > */
> > +}ChunkState;
> > +
> > +#define RAW_BUF_SIZE 65536           /* we palloc RAW_BUF_SIZE+1 bytes */
> > +
> > +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> > +#define RINGSIZE (10 * 1000)
> > +#define MAX_BLOCKS_COUNT 1000
> > +#define WORKER_CHUNK_COUNT 50        /* should be mod of RINGSIZE */
> > +
> > +#define      IsParallelCopy()                (cstate->is_parallel)
> > +#define IsLeader()                           (cstate->pcdata->is_leader)
> > +#define IsHeaderLine()                       (cstate->header_line && 
> > cstate->cur_lineno == 1)
> > +
> > +/*
> > + * Copy data block information.
> > + */
> > +typedef struct CopyDataBlock
> > +{
> > +     /* The number of unprocessed chunks in the current block. */
> > +     pg_atomic_uint32 unprocessed_chunk_parts;
> > +
> > +     /*
> > +      * If the current chunk data is continued into another block,
> > +      * following_block will have the position where the remaining data 
> > need to
> > +      * be read.
> > +      */
> > +     uint32  following_block;
> > +
> > +     /*
> > +      * This flag will be set, when the leader finds out this block can be 
> > read
> > +      * safely by the worker. This helps the worker to start processing 
> > the chunk
> > +      * early where the chunk will be spread across many blocks and the 
> > worker
> > +      * need not wait for the complete chunk to be processed.
> > +      */
> > +     bool   curr_blk_completed;
> > +     char   data[DATA_BLOCK_SIZE + 1]; /* data read from file */
> > +}CopyDataBlock;
>
> What's the + 1 here about?

Fixed this, removed +1. That is not needed.

>
>
> > +/*
> > + * Parallel copy line buffer information.
> > + */
> > +typedef struct ParallelCopyLineBuf
> > +{
> > +     StringInfoData          line_buf;
> > +     uint64                          cur_lineno;     /* line number for 
> > error messages */
> > +}ParallelCopyLineBuf;
>
> Why do we need separate infrastructure for this? We shouldn't duplicate
> infrastructure unnecessarily.
>

This was required for copying the multiple records locally and
releasing the shared memory. I have not changed this, will decide on
this based on the decision taken for one of the previous comments.

>
>
>
> > +/*
> > + * Common information that need to be copied to shared memory.
> > + */
> > +typedef struct CopyWorkerCommonData
> > +{
>
> Why is parallel specific stuff here suddenly not named ParallelCopy*
> anymore? If you introduce a naming like that it imo should be used
> consistently.

Fixed, changed to maintain ParallelCopy in all structs.

>
> > +     /* low-level state data */
> > +     CopyDest            copy_dest;          /* type of copy 
> > source/destination */
> > +     int                 file_encoding;      /* file or remote side's 
> > character encoding */
> > +     bool                need_transcoding;   /* file encoding diff from 
> > server? */
> > +     bool                encoding_embeds_ascii;      /* ASCII can be 
> > non-first byte? */
> > +
> > +     /* parameters from the COPY command */
> > +     bool                csv_mode;           /* Comma Separated Value 
> > format? */
> > +     bool                header_line;        /* CSV header line? */
> > +     int                 null_print_len; /* length of same */
> > +     bool                force_quote_all;    /* FORCE_QUOTE *? */
> > +     bool                convert_selectively;        /* do selective 
> > binary conversion? */
> > +
> > +     /* Working state for COPY FROM */
> > +     AttrNumber          num_defaults;
> > +     Oid                 relid;
> > +}CopyWorkerCommonData;
>
> But I actually think we shouldn't have this information in two different
> structs. This should exist once, independent of using parallel /
> non-parallel copy.
>

This structure helps in storing the common data from CopyStateData
that are required by the workers. This information will then be
allocated and stored into the DSM for the worker to retrieve and copy
it to CopyStateData.

>
> > +/* List information */
> > +typedef struct ListInfo
> > +{
> > +     int     count;          /* count of attributes */
> > +
> > +     /* string info in the form info followed by info1, info2... infon  */
> > +     char    info[1];
> > +} ListInfo;
>
> Based on these comments I have no idea what this could be for.
>

Have added better comments for this. The following is added: This
structure will help in converting a List data type into the below
structure format with the count having the number of elements in the
list and the info having the List elements appended contiguously. This
converted structure will be allocated in shared memory and stored in
DSM for the worker to retrieve and later convert it back to List data
type.

>
> >  /*
> > - * This keeps the character read at the top of the loop in the buffer
> > - * even if there is more than one read-ahead.
> > + * This keeps the character read at the top of the loop in the buffer
> > + * even if there is more than one read-ahead.
> > + */
> > +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
> > +if (1) \
> > +{ \
> > +     if (copy_buff_state.raw_buf_ptr + (extralen) >= 
> > copy_buff_state.copy_buf_len && !hit_eof) \
> > +     { \
> > +             if (IsParallelCopy()) \
> > +             { \
> > +                     copy_buff_state.chunk_size = prev_chunk_size; /* 
> > update previous chunk size */ \
> > +                     if (copy_buff_state.block_switched) \
> > +                     { \
> > +                             
> > pg_atomic_sub_fetch_u32(&copy_buff_state.data_blk_ptr->unprocessed_chunk_parts,
> >  1); \
> > +                             copy_buff_state.copy_buf_len = 
> > prev_copy_buf_len; \
> > +                     } \
> > +             } \
> > +             copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
> > +             need_data = true; \
> > +             continue; \
> > +     } \
> > +} else ((void) 0)
>
> I think it's an absolutely clear no-go to add new branches to
> these. They're *really* hot already, and this is going to sprinkle a
> significant amount of new instructions over a lot of places.
>

Fixed, removed this.

>
>
> > +/*
> > + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file 
> > data must
> > + * be read.
> > + */
> > +#define SET_RAWBUF_FOR_LOAD() \
> > +{ \
> > +     ShmCopyInfo     *pcshared_info = cstate->pcdata->pcshared_info; \
> > +     uint32 cur_block_pos; \
> > +     /* \
> > +      * Mark the previous block as completed, worker can start copying 
> > this data. \
> > +      */ \
> > +     if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr 
> > && \
> > +             copy_buff_state.data_blk_ptr->curr_blk_completed == false) \
> > +             copy_buff_state.data_blk_ptr->curr_blk_completed = true; \
> > +     \
> > +     copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> > +     cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \
> > +     copy_buff_state.curr_data_blk_ptr = 
> > &pcshared_info->data_blocks[cur_block_pos]; \
> > +     \
> > +     if (!copy_buff_state.data_blk_ptr) \
> > +     { \
> > +             copy_buff_state.data_blk_ptr = 
> > copy_buff_state.curr_data_blk_ptr; \
> > +             chunk_first_block = cur_block_pos; \
> > +     } \
> > +     else if (need_data == false) \
> > +             copy_buff_state.data_blk_ptr->following_block = 
> > cur_block_pos; \
> > +     \
> > +     cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \
> > +     copy_buff_state.copy_raw_buf = cstate->raw_buf; \
> > +}
> > +
> > +/*
> > + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory.
> > + */
> > +#define END_CHUNK_PARALLEL_COPY() \
> > +{ \
> > +     if (!IsHeaderLine()) \
> > +     { \
> > +             ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> > +             ChunkBoundaries *chunkBoundaryPtr = 
> > &pcshared_info->chunk_boundaries; \
> > +             if (copy_buff_state.chunk_size) \
> > +             { \
> > +                     ChunkBoundary *chunkInfo = 
> > &chunkBoundaryPtr->ring[chunk_pos]; \
> > +                     /* \
> > +                      * If raw_buf_ptr is zero, unprocessed_chunk_parts 
> > would have been \
> > +                      * incremented in SEEK_COPY_BUFF_POS. This will 
> > happen if the whole \
> > +                      * chunk finishes at the end of the current block. If 
> > the \
> > +                      * new_line_size > raw_buf_ptr, then the new block 
> > has only new line \
> > +                      * char content. The unprocessed count should not be 
> > increased in \
> > +                      * this case. \
> > +                      */ \
> > +                     if (copy_buff_state.raw_buf_ptr != 0 && \
> > +                             copy_buff_state.raw_buf_ptr > new_line_size) \
> > +                             
> > pg_atomic_add_fetch_u32(&copy_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts,
> >  1); \
> > +                     \
> > +                     /* Update chunk size. */ \
> > +                     pg_atomic_write_u32(&chunkInfo->chunk_size, 
> > copy_buff_state.chunk_size); \
> > +                     pg_atomic_write_u32(&chunkInfo->chunk_state, 
> > CHUNK_LEADER_POPULATED); \
> > +                     elog(DEBUG1, "[Leader] After adding - chunk 
> > position:%d, chunk_size:%d", \
> > +                                             chunk_pos, 
> > copy_buff_state.chunk_size); \
> > +                     pcshared_info->populated++; \
> > +             } \
> > +             else if (new_line_size) \
> > +             { \
> > +                     /* \
> > +                      * This means only new line char, empty record should 
> > be \
> > +                      * inserted. \
> > +                      */ \
> > +                     ChunkBoundary *chunkInfo; \
> > +                     chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, 
> > \
> > +                                                                           
> >              CHUNK_LEADER_POPULATED); \
> > +                     chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> > +                     elog(DEBUG1, "[Leader] Added empty chunk with 
> > offset:%d, chunk position:%d, chunk size:%d", \
> > +                                              chunkInfo->start_offset, 
> > chunk_pos, \
> > +                                              
> > pg_atomic_read_u32(&chunkInfo->chunk_size)); \
> > +                     pcshared_info->populated++; \
> > +             } \
> > +     }\
> > +     \
> > +     /*\
> > +      * All of the read data is processed, reset index & len. In the\
> > +      * subsequent read, we will get a new block and copy data in to the\
> > +      * new block.\
> > +      */\
> > +     if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\
> > +     {\
> > +             cstate->raw_buf_index = 0;\
> > +             cstate->raw_buf_len = 0;\
> > +     }\
> > +     else\
> > +             cstate->raw_buf_len = copy_buff_state.copy_buf_len;\
> > +}
>
> Why are these macros? They are way way way above a length where that
> makes any sort of sense.
>

Converted these macros to functions.


Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From 97204eb6abafe891a654b34ff84cf9812e6c1fef Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 10 Jun 2020 06:07:17 +0530
Subject: [PATCH 1/4] Copy code readjustment to support parallel copy.

This patch has the copy code slightly readjusted so that the common code is
separated to functions/macros, these functions/macros will be used by the
workers in the parallel copy code of the upcoming patches. EOL removal is moved
from CopyReadLine to CopyReadLineText, this change was required because in case
of parallel copy the chunk identification and chunk updation is done in
CopyReadLineText, before chunk information is updated in shared memory the new
line characters should be removed.
---
 src/backend/commands/copy.c | 320 ++++++++++++++++++++++++++------------------
 1 file changed, 191 insertions(+), 129 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6d53dc4..eaf0f78 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -95,6 +95,9 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+#define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
+#define IsHeaderLine()			(cstate->header_line && cstate->cur_lineno == 1)
+
 /*
  * This struct contains all the state variables used throughout a COPY
  * operation. For simplicity, we use the same struct for all variants of COPY,
@@ -219,7 +222,6 @@ typedef struct CopyStateData
 	 * converts it.  Note: we guarantee that there is a \0 at
 	 * raw_buf[raw_buf_len].
 	 */
-#define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
@@ -347,6 +349,88 @@ if (1) \
 	goto not_end_of_copy; \
 } else ((void) 0)
 
+/*
+ * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding.
+ */
+#define CONVERT_TO_SERVER_ENCODING(cstate) \
+{ \
+	/* Done reading the line.  Convert it to server encoding. */ \
+	if (cstate->need_transcoding) \
+	{ \
+		char	   *cvt; \
+		cvt = pg_any_to_server(cstate->line_buf.data, \
+							   cstate->line_buf.len, \
+							   cstate->file_encoding); \
+		if (cvt != cstate->line_buf.data) \
+		{ \
+			/* transfer converted data back to line_buf */ \
+			resetStringInfo(&cstate->line_buf); \
+			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); \
+			pfree(cvt); \
+		} \
+	} \
+	/* Now it's safe to use the buffer in error messages */ \
+	cstate->line_buf_converted = true; \
+}
+
+/*
+ * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data.
+ */
+#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos, copy_line_size) \
+{ \
+	/* \
+	 * If we didn't hit EOF, then we must have transferred the EOL marker \
+	 * to line_buf along with the data.  Get rid of it. \
+	 */ \
+   switch (cstate->eol_type) \
+   { \
+	   case EOL_NL: \
+		   Assert(copy_line_size >= 1); \
+		   Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
+		   copy_line_data[copy_line_pos - 1] = '\0'; \
+		   copy_line_size--; \
+		   break; \
+	   case EOL_CR: \
+		   Assert(copy_line_size >= 1); \
+		   Assert(copy_line_data[copy_line_pos - 1] == '\r'); \
+		   copy_line_data[copy_line_pos - 1] = '\0'; \
+		   copy_line_size--; \
+		   break; \
+	   case EOL_CRNL: \
+		   Assert(copy_line_size >= 2); \
+		   Assert(copy_line_data[copy_line_pos - 2] == '\r'); \
+		   Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
+		   copy_line_data[copy_line_pos - 2] = '\0'; \
+		   copy_line_size -= 2; \
+		   break; \
+	   case EOL_UNKNOWN: \
+		   /* shouldn't get here */ \
+		   Assert(false); \
+		   break; \
+   } \
+}
+
+/*
+ * CLEAR_EOL_LINE - Wrapper for clearing EOL.
+ */
+#define CLEAR_EOL_LINE() \
+if (!result && !IsHeaderLine()) \
+	CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \
+								cstate->line_buf.len, \
+								cstate->line_buf.len) \
+
+/*
+ * INCREMENTPROCESSED - Increment the lines processed.
+ */
+#define INCREMENTPROCESSED(processed)  \
+processed++;
+
+/*
+ * GETPROCESSED - Get the lines processed.
+ */
+#define GETPROCESSED(processed) \
+return processed;
+
 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
@@ -393,6 +477,8 @@ static bool CopyGetInt32(CopyState cstate, int32 *val);
 static void CopySendInt16(CopyState cstate, int16 val);
 static bool CopyGetInt16(CopyState cstate, int16 *val);
 
+static void PopulateAttributes(CopyState cstate, TupleDesc	tup_desc,
+							   List *attnamelist);
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -1464,7 +1550,6 @@ BeginCopy(ParseState *pstate,
 {
 	CopyState	cstate;
 	TupleDesc	tupDesc;
-	int			num_phys_attrs;
 	MemoryContext oldcontext;
 
 	/* Allocate workspace and zero all fields */
@@ -1630,6 +1715,22 @@ BeginCopy(ParseState *pstate,
 		tupDesc = cstate->queryDesc->tupDesc;
 	}
 
+	PopulateAttributes(cstate, tupDesc, attnamelist);
+	cstate->copy_dest = COPY_FILE;	/* default */
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return cstate;
+}
+
+/*
+ * PopulateAttributes - Populate the attributes.
+ */
+static void
+PopulateAttributes(CopyState cstate, TupleDesc	tupDesc, List *attnamelist)
+{
+	int			num_phys_attrs;
+
 	/* Generate or convert list of attributes to process */
 	cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
 
@@ -1749,12 +1850,6 @@ BeginCopy(ParseState *pstate,
 		 pg_database_encoding_max_length() > 1);
 	/* See Multibyte encoding comment above */
 	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
-
-	cstate->copy_dest = COPY_FILE;	/* default */
-
-	MemoryContextSwitchTo(oldcontext);
-
-	return cstate;
 }
 
 /*
@@ -2647,32 +2742,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 }
 
 /*
- * Copy FROM file to relation.
+ * Check if the relation specified in copy from is valid.
  */
-uint64
-CopyFrom(CopyState cstate)
+static void
+CheckCopyFromValidity(CopyState cstate)
 {
-	ResultRelInfo *resultRelInfo;
-	ResultRelInfo *target_resultRelInfo;
-	ResultRelInfo *prevResultRelInfo = NULL;
-	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
-	ModifyTableState *mtstate;
-	ExprContext *econtext;
-	TupleTableSlot *singleslot = NULL;
-	MemoryContext oldcontext = CurrentMemoryContext;
-
-	PartitionTupleRouting *proute = NULL;
-	ErrorContextCallback errcallback;
-	CommandId	mycid = GetCurrentCommandId(true);
-	int			ti_options = 0; /* start with default options for insert */
-	BulkInsertState bistate = NULL;
-	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
-	uint64		processed = 0;
-	bool		has_before_insert_row_trig;
-	bool		has_instead_insert_row_trig;
-	bool		leafpart_use_multi_insert = false;
-
 	Assert(cstate->rel);
 
 	/*
@@ -2708,6 +2782,36 @@ CopyFrom(CopyState cstate)
 					 errmsg("cannot copy to non-table relation \"%s\"",
 							RelationGetRelationName(cstate->rel))));
 	}
+}
+
+/*
+ * Copy FROM file to relation.
+ */
+uint64
+CopyFrom(CopyState cstate)
+{
+	ResultRelInfo *resultRelInfo;
+	ResultRelInfo *target_resultRelInfo;
+	ResultRelInfo *prevResultRelInfo = NULL;
+	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
+	ModifyTableState *mtstate;
+	ExprContext *econtext;
+	TupleTableSlot *singleslot = NULL;
+	MemoryContext oldcontext = CurrentMemoryContext;
+
+	PartitionTupleRouting *proute = NULL;
+	ErrorContextCallback errcallback;
+	CommandId	mycid = GetCurrentCommandId(true);
+	int			ti_options = 0; /* start with default options for insert */
+	BulkInsertState bistate = NULL;
+	CopyInsertMethod insertMethod;
+	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
+	uint64		processed = 0;
+	bool		has_before_insert_row_trig;
+	bool		has_instead_insert_row_trig;
+	bool		leafpart_use_multi_insert = false;
+
+	CheckCopyFromValidity(cstate);
 
 	/*
 	 * If the target file is new-in-transaction, we assume that checking FSM
@@ -3262,7 +3366,7 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			processed++;
+			INCREMENTPROCESSED(processed)
 		}
 	}
 
@@ -3317,30 +3421,15 @@ CopyFrom(CopyState cstate)
 
 	FreeExecutorState(estate);
 
-	return processed;
+	GETPROCESSED(processed)
 }
 
 /*
- * Setup to read tuples from a file for COPY FROM.
- *
- * 'rel': Used as a template for the tuples
- * 'filename': Name of server-local file to read
- * 'attnamelist': List of char *, columns to include. NIL selects all cols.
- * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
- *
- * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ * PopulateCatalogInformation - populate the catalog information.
  */
-CopyState
-BeginCopyFrom(ParseState *pstate,
-			  Relation rel,
-			  const char *filename,
-			  bool is_program,
-			  copy_data_source_cb data_source_cb,
-			  List *attnamelist,
-			  List *options)
+static void
+PopulateCatalogInformation(CopyState cstate)
 {
-	CopyState	cstate;
-	bool		pipe = (filename == NULL);
 	TupleDesc	tupDesc;
 	AttrNumber	num_phys_attrs,
 				num_defaults;
@@ -3350,31 +3439,8 @@ BeginCopyFrom(ParseState *pstate,
 	Oid			in_func_oid;
 	int		   *defmap;
 	ExprState **defexprs;
-	MemoryContext oldcontext;
 	bool		volatile_defexprs;
 
-	cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
-	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
-
-	/* Initialize state variables */
-	cstate->reached_eof = false;
-	cstate->eol_type = EOL_UNKNOWN;
-	cstate->cur_relname = RelationGetRelationName(cstate->rel);
-	cstate->cur_lineno = 0;
-	cstate->cur_attname = NULL;
-	cstate->cur_attval = NULL;
-
-	/* Set up variables to avoid per-attribute overhead. */
-	initStringInfo(&cstate->attribute_buf);
-	initStringInfo(&cstate->line_buf);
-	cstate->line_buf_converted = false;
-	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
-	cstate->raw_buf_index = cstate->raw_buf_len = 0;
-
-	/* Assign range table, we'll need it in CopyFrom. */
-	if (pstate)
-		cstate->range_table = pstate->p_rtable;
-
 	tupDesc = RelationGetDescr(cstate->rel);
 	num_phys_attrs = tupDesc->natts;
 	num_defaults = 0;
@@ -3452,6 +3518,54 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->defexprs = defexprs;
 	cstate->volatile_defexprs = volatile_defexprs;
 	cstate->num_defaults = num_defaults;
+}
+
+/*
+ * Setup to read tuples from a file for COPY FROM.
+ *
+ * 'rel': Used as a template for the tuples
+ * 'filename': Name of server-local file to read
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ */
+CopyState
+BeginCopyFrom(ParseState *pstate,
+			  Relation rel,
+			  const char *filename,
+			  bool is_program,
+			  copy_data_source_cb data_source_cb,
+			  List *attnamelist,
+			  List *options)
+{
+	CopyState	cstate;
+	bool		pipe = (filename == NULL);
+	MemoryContext oldcontext;
+
+	cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
+	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+	/* Initialize state variables */
+	cstate->reached_eof = false;
+	cstate->eol_type = EOL_UNKNOWN;
+	cstate->cur_relname = RelationGetRelationName(cstate->rel);
+	cstate->cur_lineno = 0;
+	cstate->cur_attname = NULL;
+	cstate->cur_attval = NULL;
+
+	/* Set up variables to avoid per-attribute overhead. */
+	initStringInfo(&cstate->attribute_buf);
+	initStringInfo(&cstate->line_buf);
+	cstate->line_buf_converted = false;
+	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+	cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+	/* Assign range table, we'll need it in CopyFrom. */
+	if (pstate)
+		cstate->range_table = pstate->p_rtable;
+
+	PopulateCatalogInformation(cstate);
 	cstate->is_program = is_program;
 
 	if (data_source_cb)
@@ -3839,7 +3953,6 @@ static bool
 CopyReadLine(CopyState cstate)
 {
 	bool		result;
-
 	resetStringInfo(&cstate->line_buf);
 	cstate->line_buf_valid = true;
 
@@ -3864,60 +3977,8 @@ CopyReadLine(CopyState cstate)
 			} while (CopyLoadRawBuf(cstate));
 		}
 	}
-	else
-	{
-		/*
-		 * If we didn't hit EOF, then we must have transferred the EOL marker
-		 * to line_buf along with the data.  Get rid of it.
-		 */
-		switch (cstate->eol_type)
-		{
-			case EOL_NL:
-				Assert(cstate->line_buf.len >= 1);
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
-				cstate->line_buf.len--;
-				cstate->line_buf.data[cstate->line_buf.len] = '\0';
-				break;
-			case EOL_CR:
-				Assert(cstate->line_buf.len >= 1);
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
-				cstate->line_buf.len--;
-				cstate->line_buf.data[cstate->line_buf.len] = '\0';
-				break;
-			case EOL_CRNL:
-				Assert(cstate->line_buf.len >= 2);
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
-				cstate->line_buf.len -= 2;
-				cstate->line_buf.data[cstate->line_buf.len] = '\0';
-				break;
-			case EOL_UNKNOWN:
-				/* shouldn't get here */
-				Assert(false);
-				break;
-		}
-	}
-
-	/* Done reading the line.  Convert it to server encoding. */
-	if (cstate->need_transcoding)
-	{
-		char	   *cvt;
-
-		cvt = pg_any_to_server(cstate->line_buf.data,
-							   cstate->line_buf.len,
-							   cstate->file_encoding);
-		if (cvt != cstate->line_buf.data)
-		{
-			/* transfer converted data back to line_buf */
-			resetStringInfo(&cstate->line_buf);
-			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
-			pfree(cvt);
-		}
-	}
-
-	/* Now it's safe to use the buffer in error messages */
-	cstate->line_buf_converted = true;
 
+	CONVERT_TO_SERVER_ENCODING(cstate)
 	return result;
 }
 
@@ -4281,6 +4342,7 @@ not_end_of_copy:
 	 * Transfer any still-uncopied data to line_buf.
 	 */
 	REFILL_LINEBUF;
+	CLEAR_EOL_LINE()
 
 	return result;
 }
-- 
1.8.3.1

From 4ff785c888e93a8dd33d4e48cb4f804e204cb739 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 10 Jun 2020 07:18:33 +0530
Subject: [PATCH 3/4] Allow copy from command to process data from file/STDIN
 contents to a table in parallel.

This feature allows the copy from to leverage multiple CPUs in order to copy
data from file/STDIN to a table. This adds a PARALLEL option to COPY FROM
command where the user can specify the number of workers that can be used
to perform the COPY FROM command. Specifying zero as number of workers will
disable parallelism.
The backend, to which the "COPY FROM" query is submitted acts as leader with
the responsibility of reading data from the file/stdin, launching at most n
number of workers as specified with PARALLEL 'n' option in the "COPY FROM"
query. The leader populates the common data required for the workers execution
in the DSM and shares it with the workers. The leader then executes before
statement triggers if there exists any. Leader populates DSM chunks which
includes the start offset and chunk size, while populating the chunks it reads
as many blocks as required into the DSM data blocks from the file. Each block
is of 64K size. The leader parses the data to identify a chunk, the existing
logic from CopyReadLineText which identifies the chunks with some changes was
used for this. Leader checks if a free chunk is available to copy the
information, if there is no free chunk it waits till the required chunk is
freed up by the worker and then copies the identified chunks information
(offset & chunk size) into the DSM chunks. This process is repeated till the
complete file is processed. Simultaneously, the workers cache the chunks(50)
locally into the local memory and release the chunks to the leader for further
populating. Each worker processes the chunk that it cached and inserts it into
the table. The leader does not participate in the insertion of data, leaders
only responsibility will be to identify the chunks as fast as possible for the
workers to do the actual copy operation. The leader waits till all the chunks
populated are processed by the workers and exits.
---
 src/backend/access/heap/heapam.c     |  13 -
 src/backend/access/transam/xact.c    |  13 +
 src/backend/commands/copy.c          | 875 +++++++++++++++++++++++++++++++++--
 src/backend/optimizer/util/clauses.c |   2 +-
 src/include/access/xact.h            |   1 +
 src/tools/pgindent/typedefs.list     |   1 +
 6 files changed, 853 insertions(+), 52 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 94eb37d..6991b9f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2012,19 +2012,6 @@ static HeapTuple
 heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
 					CommandId cid, int options)
 {
-	/*
-	 * Parallel operations are required to be strictly read-only in a parallel
-	 * worker.  Parallel inserts are not safe even in the leader in the
-	 * general case, because group locking means that heavyweight locks for
-	 * relation extension or GIN page locks will not conflict between members
-	 * of a lock group, but we don't prohibit that case here because there are
-	 * useful special cases that we can safely allow, such as CREATE TABLE AS.
-	 */
-	if (IsParallelWorker())
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot insert tuples in a parallel worker")));
-
 	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
 	tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
 	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cd30b62..d43902c 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -502,6 +502,19 @@ GetCurrentFullTransactionIdIfAny(void)
 }
 
 /*
+ *	AssignFullTransactionIdForWorker
+ *
+ * For parallel copy, all the workers must use the same transaction id.
+ */
+void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId)
+{
+	TransactionState s = CurrentTransactionState;
+
+	Assert((IsInParallelMode() || IsParallelWorker()));
+	s->fullTransactionId = fullTransactionId;
+}
+
+/*
  *	MarkCurrentTransactionIdLoggedIfAny
  *
  * Remember that the current xid - if it is assigned - now has been wal logged.
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index d930644..b1e2e71 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -26,6 +26,7 @@
 #include "access/xlog.h"
 #include "catalog/dependency.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_proc_d.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "commands/defrem.h"
@@ -40,11 +41,13 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "optimizer/clauses.h"
 #include "optimizer/optimizer.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
+#include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
@@ -95,6 +98,18 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+/*
+ * State of the chunk.
+ */
+typedef enum ParallelCopyChunkState
+{
+	CHUNK_INIT,					/* initial state of chunk */
+	CHUNK_LEADER_POPULATING,	/* leader processing chunk */
+	CHUNK_LEADER_POPULATED,		/* leader completed populating chunk */
+	CHUNK_WORKER_PROCESSING,	/* worker processing chunk */
+	CHUNK_WORKER_PROCESSED		/* worker completed processing chunk */
+}ParallelCopyChunkState;
+
 #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
 
 #define DATA_BLOCK_SIZE RAW_BUF_SIZE
@@ -527,9 +542,13 @@ if (1) \
 { \
 	if (raw_buf_ptr > cstate->raw_buf_index) \
 	{ \
-		appendBinaryStringInfo(&cstate->line_buf, \
-							 cstate->raw_buf + cstate->raw_buf_index, \
-							   raw_buf_ptr - cstate->raw_buf_index); \
+		if (!IsParallelCopy()) \
+			appendBinaryStringInfo(&cstate->line_buf, \
+								   cstate->raw_buf + cstate->raw_buf_index, \
+								   raw_buf_ptr - cstate->raw_buf_index); \
+		else \
+			chunk_size +=  raw_buf_ptr - cstate->raw_buf_index; \
+		\
 		cstate->raw_buf_index = raw_buf_ptr; \
 	} \
 } else ((void) 0)
@@ -542,13 +561,40 @@ if (1) \
 	goto not_end_of_copy; \
 } else ((void) 0)
 
+/* Begin parallel copy Macros */
+#define SET_NEWLINE_SIZE() \
+{ \
+	if (cstate->eol_type == EOL_NL || cstate->eol_type == EOL_CR) \
+		new_line_size = 1; \
+	else if (cstate->eol_type == EOL_CRNL) \
+		new_line_size = 2; \
+	else \
+		new_line_size = 0; \
+}
+
+/*
+ * COPY_WAIT_TO_PROCESS - Wait before continuing to process.
+ */
+#define COPY_WAIT_TO_PROCESS() \
+{ \
+	CHECK_FOR_INTERRUPTS(); \
+	(void) WaitLatch(MyLatch, \
+					 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, \
+					 1L, WAIT_EVENT_PG_SLEEP); \
+	ResetLatch(MyLatch); \
+}
+
+/* End parallel copy Macros */
+
 /*
  * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding.
  */
 #define CONVERT_TO_SERVER_ENCODING(cstate) \
 { \
 	/* Done reading the line.  Convert it to server encoding. */ \
-	if (cstate->need_transcoding) \
+	if (cstate->need_transcoding && \
+		(!IsParallelCopy() || \
+		 (IsParallelCopy() && !IsLeader()))) \
 	{ \
 		char	   *cvt; \
 		cvt = pg_any_to_server(cstate->line_buf.data, \
@@ -607,22 +653,38 @@ if (1) \
  * CLEAR_EOL_LINE - Wrapper for clearing EOL.
  */
 #define CLEAR_EOL_LINE() \
-if (!result && !IsHeaderLine()) \
-	CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \
-								cstate->line_buf.len, \
-								cstate->line_buf.len) \
+{ \
+	if (!result && !IsHeaderLine()) \
+	{ \
+		if (IsParallelCopy()) \
+			CLEAR_EOL_FROM_COPIED_DATA(cstate->raw_buf, \
+									   raw_buf_ptr, chunk_size) \
+		else \
+			CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \
+									   cstate->line_buf.len, \
+									   cstate->line_buf.len) \
+	} \
+}
 
 /*
  * INCREMENTPROCESSED - Increment the lines processed.
  */
-#define INCREMENTPROCESSED(processed)  \
-processed++;
+#define INCREMENTPROCESSED(processed) \
+{ \
+	if (!IsParallelCopy()) \
+		processed++; \
+	else \
+		pg_atomic_add_fetch_u64(&cstate->pcdata->pcshared_info->processed, 1); \
+}
 
 /*
  * GETPROCESSED - Get the lines processed.
  */
 #define GETPROCESSED(processed) \
-return processed;
+if (!IsParallelCopy()) \
+	return processed; \
+else \
+	return pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed);
 
 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
@@ -671,8 +733,12 @@ static void CopySendInt16(CopyState cstate, int16 val);
 static bool CopyGetInt16(CopyState cstate, int16 *val);
 
 static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt);
+static void ExecBeforeStmtTrigger(CopyState cstate);
+static void CheckCopyFromValidity(CopyState cstate);
 static void PopulateAttributes(CopyState cstate, TupleDesc	tup_desc,
 							   List *attnamelist);
+static void PopulateCatalogInformation(CopyState cstate);
+static pg_attribute_always_inline uint32 GetChunkPosition(CopyState cstate);
 static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname);
 static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr);
 
@@ -826,6 +892,130 @@ InsertListShm(ParallelContext *pcxt, int key, List *inputlist,
 }
 
 /*
+ * IsTriggerFunctionParallelSafe - Check if the trigger function is parallel
+ * safe for the triggers. Return false if any one of the trigger has parallel
+ * unsafe function.
+ */
+static pg_attribute_always_inline bool
+IsTriggerFunctionParallelSafe(TriggerDesc *trigdesc)
+{
+	int	i;
+	for (i = 0; i < trigdesc->numtriggers; i++)
+	{
+		Trigger    *trigger = &trigdesc->triggers[i];
+		int 		trigtype = RI_TRIGGER_NONE;
+
+		if (func_parallel(trigger->tgfoid) != PROPARALLEL_SAFE)
+			return false;
+
+		/* If the trigger is parallel safe, also look for RI_TRIGGER. */
+		trigtype = RI_FKey_trigger_type(trigger->tgfoid);
+		if (trigtype == RI_TRIGGER_PK || trigtype == RI_TRIGGER_FK)
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * CheckExprParallelSafety - determine parallel safety of volatile expressions
+ * in default clause of column definition or in where clause and return true if
+ * they are parallel safe.
+ */
+static pg_attribute_always_inline bool
+CheckExprParallelSafety(CopyState cstate)
+{
+	if (contain_volatile_functions(cstate->whereClause))
+	{
+		if (!is_parallel_safe(NULL, (Node *)cstate->whereClause))
+			return false;
+	}
+
+	if (cstate->volatile_defexprs && cstate->defexprs != NULL &&
+		cstate->num_defaults != 0)
+	{
+		int     i;
+		for (i = 0; i < cstate->num_defaults; i++)
+		{
+			if (!is_parallel_safe(NULL, (Node *) cstate->defexprs[i]->expr))
+				return false;
+		}
+	}
+
+	return true;
+}
+
+/*
+ * FindInsertMethod - determine insert mode single, multi, or multi conditional.
+ */
+static pg_attribute_always_inline CopyInsertMethod
+FindInsertMethod(CopyState cstate)
+{
+	if (cstate->rel->trigdesc != NULL &&
+		(cstate->rel->trigdesc->trig_insert_before_row ||
+		 cstate->rel->trigdesc->trig_insert_instead_row))
+		return CIM_SINGLE;
+
+	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+			cstate->rel->trigdesc != NULL &&
+			cstate->rel->trigdesc->trig_insert_new_table)
+		return CIM_SINGLE;
+
+	if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+		return CIM_SINGLE;
+
+	if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return CIM_MULTI_CONDITIONAL;
+
+	return CIM_MULTI;
+}
+
+/*
+ * IsParallelCopyAllowed - check for the cases where parallel copy is not
+ * applicable.
+ */
+static pg_attribute_always_inline bool
+IsParallelCopyAllowed(CopyState cstate)
+{
+	/* Parallel copy not allowed for freeze & binary option. */
+	if (cstate->freeze || cstate->binary)
+		return false;
+
+	/* Check if copy is into foreign table. */
+	if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+		return false;
+
+	/* Check if copy is into a temporary table. */
+	if (RELATION_IS_LOCAL(cstate->rel) || RELATION_IS_OTHER_TEMP(cstate->rel))
+		return false;
+
+	/* Check if trigger function is parallel safe. */
+	if (cstate->rel->trigdesc != NULL &&
+			!IsTriggerFunctionParallelSafe(cstate->rel->trigdesc))
+		return false;
+
+	/*
+	 * Check if there is after statement or instead of trigger or transition
+	 * table triggers.
+	 */
+	if (cstate->rel->trigdesc != NULL &&
+		(cstate->rel->trigdesc->trig_insert_after_statement ||
+		 cstate->rel->trigdesc->trig_insert_instead_row ||
+		 cstate->rel->trigdesc->trig_insert_new_table))
+		return false;
+
+	/* Check if the volatile expressions are parallel safe, if present any. */
+	if (!CheckExprParallelSafety(cstate))
+		return false;
+
+	/* Check if the insertion mode is single. */
+	if (FindInsertMethod(cstate) == CIM_SINGLE)
+		return false;
+
+	return true;
+}
+
+/*
  * BeginParallelCopy - start parallel copy tasks.
  *
  * Get the number of workers required to perform the parallel copy. The data
@@ -855,6 +1045,8 @@ BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid)
 	int  parallel_workers = 0;
 	ParallelCopyData *pcdata;
 
+	CheckCopyFromValidity(cstate);
+
 	parallel_workers = Min(nworkers, max_worker_processes);
 
 	/* Can't perform copy in parallel */
@@ -864,6 +1056,15 @@ BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid)
 	pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData));
 	cstate->pcdata = pcdata;
 
+	/*
+	 * User chosen parallel copy. Determine if the parallel copy is actually
+	 * allowed. If not, go with the non-parallel mode.
+	 */
+	if (!IsParallelCopyAllowed(cstate))
+		return NULL;
+
+	full_transaction_id = GetCurrentFullTransactionId();
+
 	EnterParallelMode();
 	pcxt = CreateParallelContext("postgres", "ParallelCopyMain",
 								 parallel_workers);
@@ -1090,7 +1291,211 @@ ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate,
 	cstate->line_buf_converted = false;
 	cstate->raw_buf = NULL;
 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+	PopulateCatalogInformation(cstate);
+
+	/* Create workspace for CopyReadAttributes results. */
+	if (!cstate->binary)
+	{
+		AttrNumber	attr_count = list_length(cstate->attnumlist);
+
+		cstate->max_fields = attr_count;
+		cstate->raw_fields = (char **)palloc(attr_count * sizeof(char *));
+	}
 }
+
+/*
+ * CacheChunkInfo - Cache the chunk information to local memory.
+ */
+static bool
+CacheChunkInfo(CopyState cstate, uint32 buff_count)
+{
+	ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
+	ParallelCopyData *pcdata = cstate->pcdata;
+	uint32			write_pos;
+	ParallelCopyDataBlock *data_blk_ptr;
+	ParallelCopyChunkBoundary *chunkInfo;
+	uint32 offset;
+	int dataSize;
+	int copiedSize = 0;
+
+	resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf);
+	write_pos = GetChunkPosition(cstate);
+	if (-1 == write_pos)
+		return true;
+
+	/* Get the current chunk information. */
+	chunkInfo = &pcshared_info->chunk_boundaries.ring[write_pos];
+	if (pg_atomic_read_u32(&chunkInfo->chunk_size) == 0)
+		goto empty_data_chunk_update;
+
+	/* Get the block information. */
+	data_blk_ptr = &pcshared_info->data_blocks[chunkInfo->first_block];
+
+	/* Get the offset information from where the data must be copied. */
+	offset = chunkInfo->start_offset;
+	pcdata->worker_line_buf[buff_count].cur_lineno = chunkInfo->cur_lineno;
+
+	elog(DEBUG1, "[Worker] Processing - chunk position:%d, block:%d, unprocessed chunks:%d, offset:%d, chunk size:%d",
+				 write_pos, chunkInfo->first_block,
+				 pg_atomic_read_u32(&data_blk_ptr->unprocessed_chunk_parts),
+				 offset, pg_atomic_read_u32(&chunkInfo->chunk_size));
+
+	for (;;)
+	{
+		uint8 skip_bytes = data_blk_ptr->skip_bytes;
+		/*
+		 * There is a possibility that the above loop has come out because
+		 * data_blk_ptr->curr_blk_completed is set, but dataSize read might
+		 * be an old value, if data_blk_ptr->curr_blk_completed and the chunk is
+		 * completed, chunk_size will be set. Read the chunk_size again to be
+		 * sure if it is complete or partial block.
+		 */
+		dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size);
+		if (dataSize)
+		{
+			int remainingSize = dataSize - copiedSize;
+			if (!remainingSize)
+				break;
+
+			/* Whole chunk is in current block. */
+			if (remainingSize + offset + skip_bytes < DATA_BLOCK_SIZE)
+			{
+				appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf,
+									   &data_blk_ptr->data[offset],
+									   remainingSize);
+				pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts,
+										1);
+				break;
+			}
+			else
+			{
+				/* Chunk is spread across the blocks. */
+				uint32 chunkInCurrentBlock =  (DATA_BLOCK_SIZE - skip_bytes) - offset;
+				appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf,
+										 &data_blk_ptr->data[offset],
+										 chunkInCurrentBlock);
+				pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1);
+				copiedSize += chunkInCurrentBlock;
+				while (copiedSize < dataSize)
+				{
+					uint32 currentBlockCopySize;
+					ParallelCopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block];
+					skip_bytes = currBlkPtr->skip_bytes;
+
+					/*
+					 * If complete data is present in current block use
+					 * dataSize - copiedSize, or copy the whole block from
+					 * current block.
+					 */
+					currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE - skip_bytes);
+					appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf,
+											 &currBlkPtr->data[0],
+											 currentBlockCopySize);
+					pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_chunk_parts, 1);
+					copiedSize += currentBlockCopySize;
+					data_blk_ptr = currBlkPtr;
+				}
+
+				break;
+			}
+		}
+		else
+		{
+			/* Copy this complete block from the current offset. */
+			uint32 chunkInCurrentBlock =  (DATA_BLOCK_SIZE - skip_bytes) - offset;
+			appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf,
+									 &data_blk_ptr->data[offset],
+									 chunkInCurrentBlock);
+			pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_chunk_parts, 1);
+			copiedSize += chunkInCurrentBlock;
+
+			/*
+			 * Reset the offset. For the first copy, copy from the offset. For
+			 * the subsequent copy the complete block.
+			 */
+			offset = 0;
+
+			/* Set data_blk_ptr to the following block. */
+			data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block];
+		}
+
+		for (;;)
+		{
+			/* Get the size of this chunk */
+			dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size);
+
+			/*
+			 * If the data is present in current block chunkInfo.chunk_size
+			 * will be updated. If the data is spread across the blocks either
+			 * of chunkInfo.chunk_size or data_blk_ptr->curr_blk_completed can
+			 * be updated. chunkInfo.chunk_size will be updated if the complete
+			 * read is finished. data_blk_ptr->curr_blk_completed will be
+			 * updated if processing of current block is finished and data
+			 * processing is not finished.
+			 */
+			if (data_blk_ptr->curr_blk_completed || (dataSize != -1))
+				break;
+
+			COPY_WAIT_TO_PROCESS()
+		}
+	}
+
+empty_data_chunk_update:
+	elog(DEBUG1, "[Worker] Completed processing chunk:%d", write_pos);
+	pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_WORKER_PROCESSED);
+	pg_atomic_write_u32(&chunkInfo->chunk_size, -1);
+	pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1);
+	return false;
+}
+
+/*
+ * GetWorkerChunk - Returns a chunk for worker to process.
+ */
+static bool
+GetWorkerChunk(CopyState cstate)
+{
+	uint32 buff_count;
+	ParallelCopyData *pcdata = cstate->pcdata;
+
+	/*
+	 * Copy the chunk data to line_buf and release the chunk position so that the
+	 * worker can continue loading data.
+	 */
+	if (pcdata->worker_line_buf_pos < pcdata->worker_line_buf_count)
+		goto return_chunk;
+
+	pcdata->worker_line_buf_pos = 0;
+	pcdata->worker_line_buf_count = 0;
+
+	for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++)
+	{
+		bool result = CacheChunkInfo(cstate, buff_count);
+		if (result)
+			break;
+
+		pcdata->worker_line_buf_count++;
+	}
+
+	if (pcdata->worker_line_buf_count)
+		goto return_chunk;
+	else
+		resetStringInfo(&cstate->line_buf);
+
+	return true;
+
+return_chunk:
+	cstate->line_buf = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].line_buf;
+	cstate->cur_lineno = pcdata->worker_line_buf[pcdata->worker_line_buf_pos].cur_lineno;
+	cstate->line_buf_valid = true;
+
+	/* Mark that encoding conversion hasn't occurred yet. */
+	cstate->line_buf_converted = false;
+	CONVERT_TO_SERVER_ENCODING(cstate)
+	pcdata->worker_line_buf_pos++;
+	return false;
+}
+
 /*
  * ParallelCopyMain - parallel copy worker's code.
  *
@@ -1136,6 +1541,7 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc)
 	ereport(DEBUG1, (errmsg("Starting parallel copy worker")));
 
 	pcdata->pcshared_info = pcshared_info;
+	AssignFullTransactionIdForWorker(pcshared_info->full_transaction_id);
 
 	shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE,
 														   false);
@@ -1188,6 +1594,34 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc)
 	MemoryContextSwitchTo(oldcontext);
 	return;
 }
+
+/*
+ * UpdateBlockInChunkInfo - Update the chunk information.
+ */
+static pg_attribute_always_inline int
+UpdateBlockInChunkInfo(CopyState cstate, uint32 blk_pos,
+					   uint32 offset, uint32 chunk_size, uint32 chunk_state)
+{
+	ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
+	ParallelCopyChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries;
+	ParallelCopyChunkBoundary *chunkInfo;
+	int chunk_pos = chunkBoundaryPtr->leader_pos;
+
+	/* Update the chunk information for the worker to pick and process. */
+	chunkInfo = &chunkBoundaryPtr->ring[chunk_pos];
+	while (pg_atomic_read_u32(&chunkInfo->chunk_size) != -1)
+		COPY_WAIT_TO_PROCESS()
+
+	chunkInfo->first_block = blk_pos;
+	chunkInfo->start_offset = offset;
+	chunkInfo->cur_lineno = cstate->cur_lineno;
+	pg_atomic_write_u32(&chunkInfo->chunk_size, chunk_size);
+	pg_atomic_write_u32(&chunkInfo->chunk_state, chunk_state);
+	chunkBoundaryPtr->leader_pos = (chunkBoundaryPtr->leader_pos + 1) % RINGSIZE;
+
+	return chunk_pos;
+}
+
 /*
  * ParallelCopyLeader - parallel copy leader's functionality.
  *
@@ -1213,9 +1647,158 @@ ParallelCopyLeader(CopyState cstate)
 	ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
 	ereport(DEBUG1, (errmsg("Running parallel copy leader")));
 
+	/* Execute the before statement triggers from the leader */
+	ExecBeforeStmtTrigger(cstate);
+
+	/* On input just throw the header line away. */
+	if (cstate->cur_lineno == 0 && cstate->header_line)
+	{
+		cstate->cur_lineno++;
+		if (CopyReadLine(cstate))
+		{
+			pcshared_info->is_read_in_progress = false;
+			return;			/* done */
+		}
+	}
+
+	for (;;)
+	{
+		bool done;
+		cstate->cur_lineno++;
+
+		/* Actually read the line into memory here. */
+		done = CopyReadLine(cstate);
+
+		/*
+		 * EOF at start of line means we're done.  If we see EOF after some
+		 * characters, we act as though it was newline followed by EOF, ie,
+		 * process the line and then exit loop on next iteration.
+		 */
+		if (done && cstate->line_buf.len == 0)
+			break;
+	}
+
 	pcshared_info->is_read_in_progress = false;
 	cstate->cur_lineno = 0;
 }
+
+/*
+ * GetChunkPosition - return the chunk position that worker should process.
+ */
+static uint32
+GetChunkPosition(CopyState cstate)
+{
+	ParallelCopyData *pcdata = cstate->pcdata;
+	ParallelCopyShmInfo *pcshared_info = pcdata->pcshared_info;
+	uint32  previous_pos = pcdata->worker_processed_pos;
+	uint32 write_pos = (previous_pos == -1) ? 0 : (previous_pos + 1) % RINGSIZE;
+	for (;;)
+	{
+		int		dataSize;
+		bool	is_read_in_progress = pcshared_info->is_read_in_progress;
+		ParallelCopyChunkBoundary *chunkInfo;
+		ParallelCopyDataBlock *data_blk_ptr;
+		ParallelCopyChunkState chunk_state = CHUNK_LEADER_POPULATED;
+		ParallelCopyChunkState curr_chunk_state;
+		CHECK_FOR_INTERRUPTS();
+
+		/* File read completed & no elements to process. */
+		if (!is_read_in_progress &&
+			(pcshared_info->populated ==
+			 pg_atomic_read_u64(&pcshared_info->total_worker_processed)))
+		{
+			write_pos = -1;
+			break;
+		}
+
+		/* Get the current chunk information. */
+		chunkInfo = &pcshared_info->chunk_boundaries.ring[write_pos];
+		curr_chunk_state = pg_atomic_read_u32(&chunkInfo->chunk_state);
+		if ((write_pos % WORKER_CHUNK_COUNT == 0) &&
+			(curr_chunk_state == CHUNK_WORKER_PROCESSED ||
+			 curr_chunk_state == CHUNK_WORKER_PROCESSING))
+		{
+			pcdata->worker_processed_pos = write_pos;
+			write_pos = (write_pos + WORKER_CHUNK_COUNT) %  RINGSIZE;
+			continue;
+		}
+
+		/* Get the size of this chunk. */
+		dataSize = pg_atomic_read_u32(&chunkInfo->chunk_size);
+
+		if (dataSize != 0) /* If not an empty chunk. */
+		{
+			/* Get the block information. */
+			data_blk_ptr = &pcshared_info->data_blocks[chunkInfo->first_block];
+
+			if (!data_blk_ptr->curr_blk_completed && (dataSize == -1))
+			{
+				/* Wait till the current chunk or block is added. */
+				COPY_WAIT_TO_PROCESS()
+				continue;
+			}
+		}
+
+		/* Make sure that no worker has consumed this element. */
+		if (pg_atomic_compare_exchange_u32(&chunkInfo->chunk_state,
+										   &chunk_state, CHUNK_WORKER_PROCESSING))
+			break;
+	}
+
+	pcdata->worker_processed_pos = write_pos;
+	return write_pos;
+}
+
+/*
+ * GetFreeCopyBlock - Get a free block for data to be copied.
+ */
+static pg_attribute_always_inline uint32
+GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info)
+{
+	int count = 0;
+	uint32 last_free_block = pcshared_info->cur_block_pos;
+	uint32 block_pos = (last_free_block != -1) ? ((last_free_block + 1) % MAX_BLOCKS_COUNT): 0;
+
+	/* Get a new block for copying data. */
+	while (count < MAX_BLOCKS_COUNT)
+	{
+		ParallelCopyDataBlock *dataBlkPtr = &pcshared_info->data_blocks[block_pos];
+		uint32 unprocessed_chunk_parts = pg_atomic_read_u32(&dataBlkPtr->unprocessed_chunk_parts);
+		if (unprocessed_chunk_parts == 0)
+		{
+			dataBlkPtr->curr_blk_completed = false;
+			dataBlkPtr->skip_bytes = 0;
+			pcshared_info->cur_block_pos = block_pos;
+			return block_pos;
+		}
+
+		block_pos = (block_pos + 1) % MAX_BLOCKS_COUNT;
+		count++;
+	}
+
+	return -1;
+}
+
+/*
+ * WaitGetFreeCopyBlock - If there are no blocks available, wait and get a block
+ * for copying data.
+ */
+static uint32
+WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info)
+{
+	uint32 new_free_pos = -1;
+	for (;;)
+	{
+		new_free_pos = GetFreeCopyBlock(pcshared_info);
+		if (new_free_pos != -1)	/* We have got one block, break now. */
+			break;
+
+		COPY_WAIT_TO_PROCESS()
+	}
+
+	return new_free_pos;
+}
+
 /*
  * LookupParallelCopyFnPtr - Look up parallel copy function pointer.
  */
@@ -1251,6 +1834,146 @@ LookupParallelCopyFnStr(copy_data_source_cb fn_addr)
 	/* We can only reach this by programming error. */
 	elog(ERROR, "internal function pointer not found");
 }
+
+/*
+ * SetRawBufForLoad - Set raw_buf to the shared memory where the file data must
+ * be read.
+ */
+static void
+SetRawBufForLoad(CopyState cstate, uint32 chunk_size, uint32 copy_buf_len,
+				 uint32 raw_buf_ptr, char **copy_raw_buf)
+{
+	ParallelCopyShmInfo	*pcshared_info;
+	uint32 cur_block_pos;
+	uint32 next_block_pos;
+	ParallelCopyDataBlock	*cur_data_blk_ptr = NULL;
+	ParallelCopyDataBlock	*next_data_blk_ptr = NULL;
+
+	if (!IsParallelCopy())
+		return;
+
+	pcshared_info = cstate->pcdata->pcshared_info;
+	cur_block_pos = pcshared_info->cur_block_pos;
+	cur_data_blk_ptr = (cstate->raw_buf) ? &pcshared_info->data_blocks[cur_block_pos] : NULL;
+	next_block_pos = WaitGetFreeCopyBlock(pcshared_info);
+	next_data_blk_ptr = &pcshared_info->data_blocks[next_block_pos];
+
+	/* set raw_buf to the data block in shared memory */
+	cstate->raw_buf = next_data_blk_ptr->data;
+	*copy_raw_buf = cstate->raw_buf;
+	if (cur_data_blk_ptr && chunk_size)
+	{
+		/*
+		 * Mark the previous block as completed, worker can start copying this
+		 * data.
+		 */
+		cur_data_blk_ptr->following_block = next_block_pos;
+		pg_atomic_add_fetch_u32(&cur_data_blk_ptr->unprocessed_chunk_parts, 1);
+		cur_data_blk_ptr->skip_bytes = copy_buf_len - raw_buf_ptr;
+		cur_data_blk_ptr->curr_blk_completed = true;
+	}
+}
+
+/*
+ * EndChunkParallelCopy - Update the chunk information in shared memory.
+ */
+static void
+EndChunkParallelCopy(CopyState cstate, uint32 chunk_pos, uint32 chunk_size,
+					 uint32 raw_buf_ptr)
+{
+	uint8 new_line_size;
+	if (!IsParallelCopy())
+		return;
+
+	if (!IsHeaderLine())
+	{
+		ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
+		ParallelCopyChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries;
+		SET_NEWLINE_SIZE()
+		if (chunk_size)
+		{
+			ParallelCopyChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos];
+			/*
+			 * If the new_line_size > raw_buf_ptr, then the new block has only
+			 * new line char content. The unprocessed count should not be
+			 * increased in this case.
+			 */
+			if (raw_buf_ptr > new_line_size)
+			{
+				uint32 cur_block_pos = pcshared_info->cur_block_pos;
+				ParallelCopyDataBlock *curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos];
+				pg_atomic_add_fetch_u32(&curr_data_blk_ptr->unprocessed_chunk_parts, 1);
+			}
+
+			/* Update chunk size. */
+			pg_atomic_write_u32(&chunkInfo->chunk_size, chunk_size);
+			pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED);
+			elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d",
+						 chunk_pos, chunk_size);
+			pcshared_info->populated++;
+		}
+		else if (new_line_size)
+		{
+			/* This means only new line char, empty record should be inserted.*/
+			ParallelCopyChunkBoundary *chunkInfo;
+			chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0,
+											   CHUNK_LEADER_POPULATED);
+			chunkInfo = &chunkBoundaryPtr->ring[chunk_pos];
+			elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d",
+						 chunkInfo->start_offset, chunk_pos,
+						 pg_atomic_read_u32(&chunkInfo->chunk_size));
+			pcshared_info->populated++;
+		}
+	}
+}
+
+/*
+ * ExecBeforeStmtTrigger - Execute the before statement trigger, this will be
+ * executed for parallel copy by the leader process.
+ */
+static void
+ExecBeforeStmtTrigger(CopyState cstate)
+{
+	EState	   *estate = CreateExecutorState();
+	ResultRelInfo *resultRelInfo;
+
+	Assert(IsLeader());
+
+	/*
+	 * We need a ResultRelInfo so we can use the regular executor's
+	 * index-entry-making machinery.  (There used to be a huge amount of code
+	 * here that basically duplicated execUtils.c ...)
+	 */
+	resultRelInfo = makeNode(ResultRelInfo);
+	InitResultRelInfo(resultRelInfo,
+					  cstate->rel,
+					  1,		/* must match rel's position in range_table */
+					  NULL,
+					  0);
+
+	/* Verify the named relation is a valid target for INSERT */
+	CheckValidResultRel(resultRelInfo, CMD_INSERT);
+
+	estate->es_result_relations = resultRelInfo;
+	estate->es_num_result_relations = 1;
+	estate->es_result_relation_info = resultRelInfo;
+
+	ExecInitRangeTable(estate, cstate->range_table);
+
+	/*
+	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
+	 * should do this for COPY, since it's not really an "INSERT" statement as
+	 * such. However, executing these triggers maintains consistency with the
+	 * EACH ROW triggers that we already fire on COPY.
+	 */
+	ExecBSInsertTriggers(estate, resultRelInfo);
+
+	/* Close any trigger target relations */
+	ExecCleanUpTriggerState(estate);
+
+	FreeExecutorState(estate);
+}
+
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
  * in past protocol redesigns.
@@ -3611,7 +4334,8 @@ CopyFrom(CopyState cstate)
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
-	CommandId	mycid = GetCurrentCommandId(true);
+	CommandId	mycid = IsParallelCopy() ? cstate->pcdata->pcshared_info->mycid :
+										   GetCurrentCommandId(true);
 	int			ti_options = 0; /* start with default options for insert */
 	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
@@ -3621,7 +4345,14 @@ CopyFrom(CopyState cstate)
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
 
-	CheckCopyFromValidity(cstate);
+	/*
+	 * Perform this check if it is not parallel copy. In case of parallel
+	 * copy, this check is done by the leader, so that if any invalid case
+	 * exist the copy from command will error out from the leader itself,
+	 * avoiding launching workers, just to throw error.
+	 */
+	if (!IsParallelCopy())
+		CheckCopyFromValidity(cstate);
 
 	/*
 	 * If the target file is new-in-transaction, we assume that checking FSM
@@ -3848,13 +4579,16 @@ CopyFrom(CopyState cstate)
 	has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 								   resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
 
-	/*
-	 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
-	 * should do this for COPY, since it's not really an "INSERT" statement as
-	 * such. However, executing these triggers maintains consistency with the
-	 * EACH ROW triggers that we already fire on COPY.
-	 */
-	ExecBSInsertTriggers(estate, resultRelInfo);
+	if (!IsParallelCopy())
+	{
+		/*
+		 * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
+		 * should do this for COPY, since it's not really an "INSERT" statement as
+		 * such. However, executing these triggers maintains consistency with the
+		 * EACH ROW triggers that we already fire on COPY.
+		 */
+		ExecBSInsertTriggers(estate, resultRelInfo);
+	}
 
 	econtext = GetPerTupleExprContext(estate);
 
@@ -4368,7 +5102,7 @@ BeginCopyFrom(ParseState *pstate,
 	initStringInfo(&cstate->attribute_buf);
 	initStringInfo(&cstate->line_buf);
 	cstate->line_buf_converted = false;
-	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+	cstate->raw_buf = (IsParallelCopy()) ? NULL : (char *) palloc(RAW_BUF_SIZE + 1);
 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
 
 	/* Assign range table, we'll need it in CopyFrom. */
@@ -4512,26 +5246,35 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
 	/* only available for text or csv input */
 	Assert(!cstate->binary);
 
-	/* on input just throw the header line away */
-	if (cstate->cur_lineno == 0 && cstate->header_line)
+	if (IsParallelCopy())
 	{
-		cstate->cur_lineno++;
-		if (CopyReadLine(cstate))
-			return false;		/* done */
+		done = GetWorkerChunk(cstate);
+		if (done && cstate->line_buf.len == 0)
+			return false;
 	}
+	else
+	{
+		/* on input just throw the header line away */
+		if (cstate->cur_lineno == 0 && cstate->header_line)
+		{
+			cstate->cur_lineno++;
+			if (CopyReadLine(cstate))
+				return false;		/* done */
+		}
 
-	cstate->cur_lineno++;
+		cstate->cur_lineno++;
 
-	/* Actually read the line into memory here */
-	done = CopyReadLine(cstate);
+		/* Actually read the line into memory here */
+		done = CopyReadLine(cstate);
 
-	/*
-	 * EOF at start of line means we're done.  If we see EOF after some
-	 * characters, we act as though it was newline followed by EOF, ie,
-	 * process the line and then exit loop on next iteration.
-	 */
-	if (done && cstate->line_buf.len == 0)
-		return false;
+		/*
+		 * EOF at start of line means we're done.  If we see EOF after some
+		 * characters, we act as though it was newline followed by EOF, ie,
+		 * process the line and then exit loop on next iteration.
+		 */
+		if (done && cstate->line_buf.len == 0)
+			return false;
+	}
 
 	/* Parse the line into de-escaped field values */
 	if (cstate->csv_mode)
@@ -4781,9 +5524,31 @@ CopyReadLine(CopyState cstate)
 		 */
 		if (cstate->copy_dest == COPY_NEW_FE)
 		{
+			bool bIsFirst = true;
 			do
 			{
-				cstate->raw_buf_index = cstate->raw_buf_len;
+				if (!IsParallelCopy())
+					cstate->raw_buf_index = cstate->raw_buf_len;
+				else
+				{
+					if (cstate->raw_buf_index == RAW_BUF_SIZE)
+					{
+						/* Get a new block if it is the first time, From the
+						 * subsequent time, reset the index and re-use the same
+						 * block.
+						 */
+						if (bIsFirst)
+						{
+							ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
+							uint32 block_pos = WaitGetFreeCopyBlock(pcshared_info);
+							cstate->raw_buf = pcshared_info->data_blocks[block_pos].data;
+							bIsFirst = false;
+						}
+
+						cstate->raw_buf_index = cstate->raw_buf_len = 0;
+					}
+				}
+
 			} while (CopyLoadRawBuf(cstate));
 		}
 	}
@@ -4813,6 +5578,11 @@ CopyReadLineText(CopyState cstate)
 	char		quotec = '\0';
 	char		escapec = '\0';
 
+	/* For parallel copy */
+	uint32			 chunk_size = 0;
+	int				 chunk_pos = 0;
+
+	cstate->eol_type = EOL_UNKNOWN;
 	if (cstate->csv_mode)
 	{
 		quotec = cstate->quote[0];
@@ -4867,6 +5637,8 @@ CopyReadLineText(CopyState cstate)
 		if (raw_buf_ptr >= copy_buf_len || need_data)
 		{
 			REFILL_LINEBUF;
+			SetRawBufForLoad(cstate, chunk_size, copy_buf_len, raw_buf_ptr,
+							 &copy_raw_buf);
 
 			/*
 			 * Try to read some more data.  This will certainly reset
@@ -5091,9 +5863,15 @@ CopyReadLineText(CopyState cstate)
 				 * discard the data and the \. sequence.
 				 */
 				if (prev_raw_ptr > cstate->raw_buf_index)
-					appendBinaryStringInfo(&cstate->line_buf,
+				{
+					if (!IsParallelCopy())
+						appendBinaryStringInfo(&cstate->line_buf,
 										   cstate->raw_buf + cstate->raw_buf_index,
 										   prev_raw_ptr - cstate->raw_buf_index);
+					else
+						chunk_size += prev_raw_ptr - cstate->raw_buf_index;
+				}
+
 				cstate->raw_buf_index = raw_buf_ptr;
 				result = true;	/* report EOF */
 				break;
@@ -5145,6 +5923,26 @@ not_end_of_copy:
 			IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
 			raw_buf_ptr += mblen - 1;
 		}
+
+		/*
+		 * Skip the header line. Update the chunk here, this cannot be done at
+		 * the beginning, as there is a possibility that file contains empty
+		 * lines.
+		 */
+		if (IsParallelCopy() && first_char_in_line && !IsHeaderLine())
+		{
+			ParallelCopyShmInfo	*pcshared_info = cstate->pcdata->pcshared_info;
+			ParallelCopyChunkBoundary *chunkInfo;
+			uint32			 chunk_first_block = pcshared_info->cur_block_pos;
+			chunk_pos = UpdateBlockInChunkInfo(cstate,
+											   chunk_first_block,
+											   cstate->raw_buf_index, -1,
+											   CHUNK_LEADER_POPULATING);
+			chunkInfo = &pcshared_info->chunk_boundaries.ring[chunk_pos];
+			elog(DEBUG1, "[Leader] Adding - block:%d, offset:%d, chunk position:%d",
+						 chunk_first_block,	chunkInfo->start_offset, chunk_pos);
+		}
+
 		first_char_in_line = false;
 	}							/* end of outer loop */
 
@@ -5153,6 +5951,7 @@ not_end_of_copy:
 	 */
 	REFILL_LINEBUF;
 	CLEAR_EOL_LINE()
+	EndChunkParallelCopy(cstate, chunk_pos, chunk_size, raw_buf_ptr);
 
 	return result;
 }
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 0c6fe01..3faadb8 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -865,7 +865,7 @@ is_parallel_safe(PlannerInfo *root, Node *node)
 	 * planning, because those are parallel-restricted and there might be one
 	 * in this expression.  But otherwise we don't need to look.
 	 */
-	if (root->glob->maxParallelHazard == PROPARALLEL_SAFE &&
+	if (root != NULL && root->glob->maxParallelHazard == PROPARALLEL_SAFE &&
 		root->glob->paramExecTypes == NIL)
 		return true;
 	/* Else use max_parallel_hazard's search logic, but stop on RESTRICTED */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 88025b1..f8bdcc3 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -381,6 +381,7 @@ extern FullTransactionId GetTopFullTransactionId(void);
 extern FullTransactionId GetTopFullTransactionIdIfAny(void);
 extern FullTransactionId GetCurrentFullTransactionId(void);
 extern FullTransactionId GetCurrentFullTransactionIdIfAny(void);
+extern void AssignFullTransactionIdForWorker(FullTransactionId fullTransactionId);
 extern void MarkCurrentTransactionIdLoggedIfAny(void);
 extern bool SubTransactionIsActive(SubTransactionId subxid);
 extern CommandId GetCurrentCommandId(bool used);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3373894..30eb49d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1700,6 +1700,7 @@ ParallelCompletionPtr
 ParallelContext
 ParallelCopyChunkBoundaries
 ParallelCopyChunkBoundary
+ParallelCopyChunkState
 ParallelCopyCommonKeyData
 ParallelCopyData
 ParallelCopyDataBlock
-- 
1.8.3.1

From 8a4d7943545a16d980ae06dcc9f25b6a6b0b5a92 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 10 Jun 2020 06:53:04 +0530
Subject: [PATCH 2/4] Framework for leader/worker in parallel copy.

This patch has the framework for data structures in parallel copy, leader
initialization, worker initialization, shared memory updation, starting workers,
wait for workers and workers exiting.
---
 src/backend/access/transam/parallel.c       |   4 +
 src/backend/commands/copy.c                 | 812 +++++++++++++++++++++++++++-
 src/backend/replication/logical/tablesync.c |   2 +-
 src/include/commands/copy.h                 |   4 +
 src/tools/pgindent/typedefs.list            |   8 +
 5 files changed, 828 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 14a8690..09e7a19 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/copy.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -145,6 +146,9 @@ static const struct
 	},
 	{
 		"parallel_vacuum_main", parallel_vacuum_main
+	},
+	{
+		"ParallelCopyMain", ParallelCopyMain
 	}
 };
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index eaf0f78..d930644 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -96,9 +96,127 @@ typedef enum CopyInsertMethod
 } CopyInsertMethod;
 
 #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
+
+#define DATA_BLOCK_SIZE RAW_BUF_SIZE
+#define RINGSIZE (10 * 1000)
+#define MAX_BLOCKS_COUNT 1000
+#define WORKER_CHUNK_COUNT 50	/* should be mod of RINGSIZE */
+
+#define	IsParallelCopy()		(cstate->is_parallel)
+#define IsLeader()				(cstate->pcdata->is_leader)
 #define IsHeaderLine()			(cstate->header_line && cstate->cur_lineno == 1)
 
 /*
+ * Copy data block information.
+ */
+typedef struct ParallelCopyDataBlock
+{
+	/* The number of unprocessed chunks in the current block. */
+	pg_atomic_uint32 unprocessed_chunk_parts;
+
+	/*
+	 * If the current chunk data is continued into another block,
+	 * following_block will have the position where the remaining data need to
+	 * be read.
+	 */
+	uint32	following_block;
+
+	/*
+	 * This flag will be set, when the leader finds out this block can be read
+	 * safely by the worker. This helps the worker to start processing the chunk
+	 * early where the chunk will be spread across many blocks and the worker
+	 * need not wait for the complete chunk to be processed.
+	 */
+	bool   curr_blk_completed;
+	char   data[DATA_BLOCK_SIZE]; /* data read from file */
+	uint8  skip_bytes;
+}ParallelCopyDataBlock;
+
+/*
+ * Individual Chunk information.
+ */
+typedef struct ParallelCopyChunkBoundary
+{
+	/* Position of the first block in data_blocks array. */
+	uint32				first_block;
+	uint32				start_offset;   /* start offset of the chunk */
+
+	/*
+	 * Size of the current chunk -1 means chunk is yet to be filled completely,
+	 * 0 means empty chunk, >0 means chunk filled with chunk size data.
+	 */
+	pg_atomic_uint32	chunk_size;
+	pg_atomic_uint32	chunk_state;	/* chunk state */
+	uint64				cur_lineno;		/* line number for error messages */
+}ParallelCopyChunkBoundary;
+
+/*
+ * Array of the chunk.
+ */
+typedef struct ParallelCopyChunkBoundaries
+{
+	/* Position for the leader to populate a chunk. */
+	uint32			leader_pos;
+
+	/* Data read from the file/stdin by the leader process. */
+	ParallelCopyChunkBoundary	ring[RINGSIZE];
+}ParallelCopyChunkBoundaries;
+
+/*
+ * Shared information among parallel copy workers. This will be allocated in the
+ * DSM segment.
+ */
+typedef struct ParallelCopyShmInfo
+{
+	bool					is_read_in_progress; /* file read status */
+
+	/*
+	 * Actual Chunks inserted by worker (some records will be filtered based on
+	 * where condition).
+	 */
+	pg_atomic_uint64		processed;
+	pg_atomic_uint64		total_worker_processed; /* total processed records by the workers */
+	uint64					populated; /* Chunks populated by leader */
+	uint32					cur_block_pos; /* current data block */
+	ParallelCopyDataBlock			data_blocks[MAX_BLOCKS_COUNT]; /* data block array */
+	FullTransactionId		full_transaction_id; /* xid for copy from statement */
+	CommandId				mycid;	/* command id */
+	ParallelCopyChunkBoundaries			chunk_boundaries; /* chunk array */
+} ParallelCopyShmInfo;
+
+/*
+ * Parallel copy line buffer information.
+ */
+typedef struct ParallelCopyLineBuf
+{
+	StringInfoData		line_buf;
+	uint64				cur_lineno;	/* line number for error messages */
+}ParallelCopyLineBuf;
+
+/*
+ * Parallel copy data information.
+ */
+typedef struct ParallelCopyData
+{
+	Oid					relid;				/* relation id of the table */
+	ParallelCopyShmInfo	*pcshared_info;		/* common info in shared memory */
+	bool				is_leader;
+
+	/* chunk position which worker is processing */
+	uint32				worker_processed_pos;
+
+	/*
+	 * Local line_buf array, workers will copy it here and release the chunks
+	 * for the leader to continue.
+	 */
+	ParallelCopyLineBuf	worker_line_buf[WORKER_CHUNK_COUNT];
+	uint32				worker_line_buf_count; /* Number of lines */
+
+	/* Current position in worker_line_buf */
+	uint32				worker_line_buf_pos;
+}ParallelCopyData;
+
+/*
  * This struct contains all the state variables used throughout a COPY
  * operation. For simplicity, we use the same struct for all variants of COPY,
  * even though some fields are used in only some cases.
@@ -225,8 +343,66 @@ typedef struct CopyStateData
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
+	int			nworkers;
+	bool		is_parallel;
+	ParallelCopyData *pcdata;
 } CopyStateData;
 
+/*
+ * This structure helps in storing the common data from CopyStateData that are
+ * required by the workers. This information will then be allocated and stored
+ * into the DSM for the worker to retrieve and copy it to CopyStateData.
+ */
+typedef struct ParallelCopyCommonKeyData
+{
+	/* low-level state data */
+	CopyDest            copy_dest;		/* type of copy source/destination */
+	int                 file_encoding;	/* file or remote side's character encoding */
+	bool                need_transcoding;	/* file encoding diff from server? */
+	bool                encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+
+	/* parameters from the COPY command */
+	bool                csv_mode;		/* Comma Separated Value format? */
+	bool                header_line;	/* CSV header line? */
+	int                 null_print_len; /* length of same */
+	bool                force_quote_all;	/* FORCE_QUOTE *? */
+	bool                convert_selectively;	/* do selective binary conversion? */
+
+	/* Working state for COPY FROM */
+	AttrNumber          num_defaults;
+	Oid                 relid;
+}ParallelCopyCommonKeyData;
+
+/*
+ * This structure will help in converting a List data type into the below
+ * structure format with the count having the number of elements in the list and
+ * the info having the List elements appended contigously. This converted
+ * structure will be allocated in shared memory and stored in DSM for the worker
+ * to retrieve and later convert it back to List data type.
+ */
+typedef struct ParallelCopyKeyListInfo
+{
+	int	count;		/* count of attributes */
+
+	/* string info in the form info followed by info1, info2... infon  */
+	char    info[1];
+} ParallelCopyKeyListInfo;
+
+/*
+ * List of internal parallel copy function pointers.
+ */
+static const struct
+{
+	char *fn_name;
+	copy_data_source_cb fn_addr;
+}			InternalParallelCopyFuncPtrs[] =
+
+{
+	{
+		"copy_read_data", copy_read_data
+	},
+};
+
 /* DestReceiver for COPY (query) TO */
 typedef struct
 {
@@ -256,6 +432,23 @@ typedef struct
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
+/* DSM keys for parallel copy.  */
+#define PARALLEL_COPY_KEY_SHARED_INFO		    1
+#define PARALLEL_COPY_KEY_CSTATE				2
+#define PARALLEL_COPY_KEY_NULL_PRINT			3
+#define PARALLEL_COPY_KEY_NULL_PRINT_CLIENT		4
+#define PARALLEL_COPY_KEY_DELIM					5
+#define PARALLEL_COPY_KEY_QUOTE					6
+#define PARALLEL_COPY_KEY_ESCAPE				7
+#define PARALLEL_COPY_KEY_ATTNAME_LIST			8
+#define PARALLEL_COPY_KEY_FORCE_QUOTE_LIST  	9
+#define PARALLEL_COPY_KEY_NOT_NULL_LIST		   10
+#define PARALLEL_COPY_KEY_NULL_LIST			   11
+#define PARALLEL_COPY_KEY_CONVERT_LIST		   12
+#define PARALLEL_COPY_KEY_DATASOURCE_CB		   13
+#define PARALLEL_COPY_KEY_WHERE_CLAUSE_STR     14
+#define PARALLEL_COPY_KEY_RANGE_TABLE		   15
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
@@ -477,10 +670,588 @@ static bool CopyGetInt32(CopyState cstate, int32 *val);
 static void CopySendInt16(CopyState cstate, int16 val);
 static bool CopyGetInt16(CopyState cstate, int16 *val);
 
+static pg_attribute_always_inline void EndParallelCopy(ParallelContext *pcxt);
 static void PopulateAttributes(CopyState cstate, TupleDesc	tup_desc,
 							   List *attnamelist);
+static pg_attribute_always_inline copy_data_source_cb LookupParallelCopyFnPtr(const char *funcname);
+static pg_attribute_always_inline char* LookupParallelCopyFnStr(copy_data_source_cb fn_addr);
 
 /*
+ * CopyCommonInfoForWorker - Copy shared_cstate using cstate information.
+ */
+static pg_attribute_always_inline void
+CopyCommonInfoForWorker(CopyState cstate, ParallelCopyCommonKeyData *shared_cstate)
+{
+	shared_cstate->copy_dest = cstate->copy_dest;
+	shared_cstate->file_encoding = cstate->file_encoding;
+	shared_cstate->need_transcoding = cstate->need_transcoding;
+	shared_cstate->encoding_embeds_ascii = cstate->encoding_embeds_ascii;
+	shared_cstate->csv_mode = cstate->csv_mode;
+	shared_cstate->header_line = cstate->header_line;
+	shared_cstate->null_print_len = cstate->null_print_len;
+	shared_cstate->force_quote_all = cstate->force_quote_all;
+	shared_cstate->convert_selectively = cstate->convert_selectively;
+	shared_cstate->num_defaults = cstate->num_defaults;
+	shared_cstate->relid = cstate->pcdata->relid;
+}
+
+/*
+ * RetrieveSharedString - Retrieve the string from shared memory.
+ */
+static void
+RetrieveSharedString(shm_toc *toc, int sharedkey, char **copystr)
+{
+	char *shared_str_val = (char *)shm_toc_lookup(toc, sharedkey, true);
+	if (shared_str_val)
+		*copystr = pstrdup(shared_str_val);
+}
+
+/*
+ * RetrieveSharedList - Retrieve the list from shared memory.
+ */
+static void
+RetrieveSharedList(shm_toc *toc, int sharedkey, List **copylist)
+{
+	ParallelCopyKeyListInfo *listinformation = (ParallelCopyKeyListInfo *)shm_toc_lookup(toc, sharedkey,
+														   true);
+	if (listinformation)
+	{
+		int	length  = 0;
+		int count;
+
+		for (count = 0; count < listinformation->count; count++)
+		{
+			char *attname = (char *)(listinformation->info + length);
+			length += strlen(attname) + 1;
+			*copylist = lappend(*copylist, makeString(attname));
+		}
+	}
+}
+
+/*
+ * CopyListSharedMemory - Copy the list into shared memory.
+ */
+static void
+CopyListSharedMemory(List *inputlist, Size memsize, ParallelCopyKeyListInfo *sharedlistinfo)
+{
+	ListCell   *l;
+	int			length  = 0;
+
+	MemSet(sharedlistinfo, 0, memsize);
+	foreach(l, inputlist)
+	{
+		char	   *name = strVal(lfirst(l));
+		memcpy((char *)(sharedlistinfo->info + length), name, strlen(name));
+		sharedlistinfo->count++;
+		length += strlen(name) + 1;
+	}
+}
+
+/*
+ * ComputeListSize - compute the list size.
+ */
+static int
+ComputeListSize(List *inputlist)
+{
+	int est_size = sizeof(int);
+	if (inputlist != NIL)
+	{
+		ListCell   *l;
+		foreach(l, inputlist)
+			est_size += strlen(strVal(lfirst(l))) + 1;
+	}
+
+	return est_size;
+}
+
+/*
+ * EstimateChunkKeysStr - Estimate the size required in shared memory for the
+ * input string.
+ */
+static void
+EstimateChunkKeysStr(ParallelContext *pcxt, char *inputstr)
+{
+	if (inputstr)
+	{
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(inputstr) + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+}
+
+/*
+ * EstimateChunkKeysList - Estimate the size required in shared memory for the
+ * input list.
+ */
+static void
+EstimateChunkKeysList(ParallelContext *pcxt, List *inputlist,
+		Size *est_list_size)
+{
+	if (inputlist != NIL)
+	{
+		*est_list_size = ComputeListSize(inputlist);
+		shm_toc_estimate_chunk(&pcxt->estimator, *est_list_size);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+}
+
+/*
+ * InsertStringShm - Insert a string into shared memory.
+ */
+static void
+InsertStringShm(ParallelContext *pcxt, int key, char *inputstr)
+{
+	if (inputstr)
+	{
+		char *shmptr = (char *)shm_toc_allocate(pcxt->toc,
+												strlen(inputstr) + 1);
+		strcpy(shmptr, inputstr);
+		shm_toc_insert(pcxt->toc, key, shmptr);
+	}
+}
+
+/*
+ * InsertListShm - Insert a list into shared memory.
+ */
+static void
+InsertListShm(ParallelContext *pcxt, int key, List *inputlist,
+			  Size est_list_size)
+{
+	if (inputlist != NIL)
+	{
+		ParallelCopyKeyListInfo *sharedlistinfo = (ParallelCopyKeyListInfo *)shm_toc_allocate(pcxt->toc,
+																est_list_size);
+		CopyListSharedMemory(inputlist, est_list_size, sharedlistinfo);
+		shm_toc_insert(pcxt->toc, key, sharedlistinfo);
+	}
+}
+
+/*
+ * BeginParallelCopy - start parallel copy tasks.
+ *
+ * Get the number of workers required to perform the parallel copy. The data
+ * structures that are required by the parallel workers will be initialized, the
+ * size required in DSM will be calculated and the necessary keys will be loaded
+ * in the DSM. The specified number of workers will then be launched.
+ *
+ */
+static ParallelContext*
+BeginParallelCopy(int nworkers, CopyState cstate, List *attnamelist, Oid relid)
+{
+	ParallelContext *pcxt;
+	ParallelCopyShmInfo *shared_info_ptr;
+	ParallelCopyCommonKeyData *shared_cstate;
+	FullTransactionId full_transaction_id;
+	Size est_shared_info;
+	Size est_cstateshared;
+	Size est_att_list_size;
+	Size est_quote_list_size;
+	Size est_notnull_list_size;
+	Size est_null_list_size;
+	Size est_convert_list_size;
+	Size est_datasource_cb_size;
+	int  count = 0;
+	char *whereClauseStr = NULL;
+	char *rangeTableStr = NULL;
+	int  parallel_workers = 0;
+	ParallelCopyData *pcdata;
+
+	parallel_workers = Min(nworkers, max_worker_processes);
+
+	/* Can't perform copy in parallel */
+	if (parallel_workers <= 0)
+		return NULL;
+
+	pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData));
+	cstate->pcdata = pcdata;
+
+	EnterParallelMode();
+	pcxt = CreateParallelContext("postgres", "ParallelCopyMain",
+								 parallel_workers);
+	Assert(pcxt->nworkers > 0);
+
+	/*
+	 * Estimate size for shared information for PARALLEL_COPY_KEY_SHARED_INFO
+	 */
+	est_shared_info = sizeof(ParallelCopyShmInfo);
+	shm_toc_estimate_chunk(&pcxt->estimator, est_shared_info);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate the size for shared information for PARALLEL_COPY_KEY_CSTATE */
+	est_cstateshared = 	MAXALIGN(sizeof(ParallelCopyCommonKeyData));
+	shm_toc_estimate_chunk(&pcxt->estimator, est_cstateshared);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	EstimateChunkKeysStr(pcxt, cstate->null_print);
+	EstimateChunkKeysStr(pcxt, cstate->null_print_client);
+	EstimateChunkKeysStr(pcxt, cstate->delim);
+	EstimateChunkKeysStr(pcxt, cstate->quote);
+	EstimateChunkKeysStr(pcxt, cstate->escape);
+
+	if (cstate->whereClause != NULL)
+	{
+		whereClauseStr = nodeToString(cstate->whereClause);
+		EstimateChunkKeysStr(pcxt, whereClauseStr);
+	}
+
+	if (cstate->range_table != NULL)
+	{
+		rangeTableStr = nodeToString(cstate->range_table);
+		EstimateChunkKeysStr(pcxt, rangeTableStr);
+	}
+
+	/* Estimate the size for shared information for PARALLEL_COPY_KEY_XID. */
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FullTransactionId));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/*
+	 * Estimate the size for shared information for
+	 * PARALLEL_COPY_KEY_ATTNAME_LIST.
+	 */
+	EstimateChunkKeysList(pcxt, attnamelist, &est_att_list_size);
+
+	/*
+	 * Estimate the size for shared information for
+	 * PARALLEL_COPY_KEY_FORCE_QUOTE_LIST.
+	 */
+	EstimateChunkKeysList(pcxt, cstate->force_quote, &est_quote_list_size);
+
+	/*
+	 * Estimate the size for shared information for
+	 * PARALLEL_COPY_KEY_NOT_NULL_LIST.
+	 */
+	EstimateChunkKeysList(pcxt, cstate->force_notnull,
+							 &est_notnull_list_size);
+
+	/*
+	 * Estimate the size for shared information for
+	 * PARALLEL_COPY_KEY_NULL_LIST.
+	 */
+	EstimateChunkKeysList(pcxt, cstate->force_null, &est_null_list_size);
+
+	/*
+	 * Estimate the size for shared information for
+	 * PARALLEL_COPY_KEY_CONVERT_LIST.
+	 */
+	EstimateChunkKeysList(pcxt, cstate->convert_select,
+							 &est_convert_list_size);
+
+	/*
+	 * Estimate the size for shared information for
+	 * PARALLEL_COPY_KEY_DATASOURCE_CB.
+	 */
+	if (cstate->data_source_cb)
+	{
+		char *functionname = LookupParallelCopyFnStr(cstate->data_source_cb);
+		est_datasource_cb_size = strlen(functionname) + 1;
+		shm_toc_estimate_chunk(&pcxt->estimator, est_datasource_cb_size);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
+	InitializeParallelDSM(pcxt);
+
+	/* If no DSM segment was available, back out (do serial copy) */
+	if (pcxt->seg == NULL)
+	{
+		EndParallelCopy(pcxt);
+		return NULL;
+	}
+
+	/* Allocate shared memory for PARALLEL_COPY_KEY_SHARED_INFO */
+	shared_info_ptr = (ParallelCopyShmInfo *) shm_toc_allocate(pcxt->toc,
+													   est_shared_info);
+	MemSet(shared_info_ptr, 0, est_shared_info);
+	shared_info_ptr->is_read_in_progress = true;
+	shared_info_ptr->cur_block_pos = -1;
+	shared_info_ptr->full_transaction_id = full_transaction_id;
+	shared_info_ptr->mycid = GetCurrentCommandId(true);
+	for (count = 0; count < RINGSIZE; count++)
+	{
+		ParallelCopyChunkBoundary *chunkInfo = &shared_info_ptr->chunk_boundaries.ring[count];
+		pg_atomic_init_u32(&(chunkInfo->chunk_size), -1);
+	}
+
+	shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_SHARED_INFO, shared_info_ptr);
+	pcdata->pcshared_info = shared_info_ptr;
+	pcdata->relid = relid;
+
+	/* Store shared build state, for which we reserved space. */
+	shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_allocate(pcxt->toc,
+															est_cstateshared);
+
+	/* copy cstate variables. */
+	CopyCommonInfoForWorker(cstate, shared_cstate);
+	shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_CSTATE, shared_cstate);
+
+	InsertStringShm(pcxt, PARALLEL_COPY_KEY_NULL_PRINT, cstate->null_print);
+	InsertStringShm(pcxt, PARALLEL_COPY_KEY_NULL_PRINT_CLIENT,
+					  cstate->null_print_client);
+	InsertStringShm(pcxt, PARALLEL_COPY_KEY_DELIM, cstate->delim);
+	InsertStringShm(pcxt, PARALLEL_COPY_KEY_QUOTE, cstate->quote);
+	InsertStringShm(pcxt, PARALLEL_COPY_KEY_ESCAPE, cstate->escape);
+
+	InsertListShm(pcxt, PARALLEL_COPY_KEY_ATTNAME_LIST,
+					attnamelist, est_att_list_size);
+	InsertListShm(pcxt, PARALLEL_COPY_KEY_FORCE_QUOTE_LIST,
+					cstate->force_quote, est_quote_list_size);
+	InsertListShm(pcxt, PARALLEL_COPY_KEY_NOT_NULL_LIST,
+					cstate->force_notnull, est_notnull_list_size);
+	InsertListShm(pcxt, PARALLEL_COPY_KEY_NULL_LIST, cstate->force_null,
+					est_null_list_size);
+	InsertListShm(pcxt, PARALLEL_COPY_KEY_CONVERT_LIST,
+					cstate->convert_select, est_convert_list_size);
+
+	if (cstate->data_source_cb)
+	{
+		char *functionname = LookupParallelCopyFnStr(cstate->data_source_cb);
+		char *data_source_cb = (char *) shm_toc_allocate(pcxt->toc,
+														 est_datasource_cb_size);
+		strcpy(data_source_cb, functionname);
+		shm_toc_insert(pcxt->toc, PARALLEL_COPY_KEY_DATASOURCE_CB,
+					   data_source_cb);
+	}
+
+	if (cstate->whereClause)
+		InsertStringShm(pcxt, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR,
+						whereClauseStr);
+
+	if(cstate->range_table)
+		InsertStringShm(pcxt, PARALLEL_COPY_KEY_RANGE_TABLE, rangeTableStr);
+
+	LaunchParallelWorkers(pcxt);
+	if (pcxt->nworkers_launched == 0)
+	{
+		EndParallelCopy(pcxt);
+		return NULL;
+	}
+
+	/*
+	 * Caller needs to wait for all launched workers when we return.  Make sure
+	 * that the failure-to-start case will not hang forever.
+	 */
+	WaitForParallelWorkersToAttach(pcxt);
+
+	pcdata->is_leader = true;
+	cstate->is_parallel = true;
+	return pcxt;
+}
+
+/*
+ * EndParallelCopy - end the parallel copy tasks.
+ */
+static pg_attribute_always_inline void
+EndParallelCopy(ParallelContext *pcxt)
+{
+	Assert(!IsParallelWorker());
+
+	DestroyParallelContext(pcxt);
+	ExitParallelMode();
+}
+
+/*
+ * ParallelWorkerInitialization - Initialize parallel worker.
+ */
+static void
+ParallelWorkerInitialization(ParallelCopyCommonKeyData *shared_cstate,
+		CopyState cstate, List *attnamelist)
+{
+	uint32		count;
+	ParallelCopyData *pcdata = cstate->pcdata;
+	TupleDesc	tup_desc = RelationGetDescr(cstate->rel);
+
+	cstate->copy_dest = shared_cstate->copy_dest;
+	cstate->file_encoding = shared_cstate->file_encoding;
+	cstate->need_transcoding = shared_cstate->need_transcoding;
+	cstate->encoding_embeds_ascii = shared_cstate->encoding_embeds_ascii;
+	cstate->csv_mode = shared_cstate->csv_mode;
+	cstate->header_line = shared_cstate->header_line;
+	cstate->null_print_len = shared_cstate->null_print_len;
+	cstate->force_quote_all = shared_cstate->force_quote_all;
+	cstate->convert_selectively = shared_cstate->convert_selectively;
+	cstate->num_defaults = shared_cstate->num_defaults;
+	pcdata->relid = shared_cstate->relid;
+
+	PopulateAttributes(cstate, tup_desc, attnamelist);
+
+	/* Initialize state variables. */
+	cstate->reached_eof = false;
+	cstate->eol_type = EOL_UNKNOWN;
+	cstate->cur_relname = RelationGetRelationName(cstate->rel);
+	cstate->cur_lineno = 0;
+	cstate->cur_attname = NULL;
+	cstate->cur_attval = NULL;
+
+	/* Set up variables to avoid per-attribute overhead. */
+	initStringInfo(&cstate->attribute_buf);
+
+	initStringInfo(&cstate->line_buf);
+	for (count = 0; count < WORKER_CHUNK_COUNT;count++)
+		initStringInfo(&pcdata->worker_line_buf[count].line_buf);
+
+	cstate->line_buf_converted = false;
+	cstate->raw_buf = NULL;
+	cstate->raw_buf_index = cstate->raw_buf_len = 0;
+}
+/*
+ * ParallelCopyMain - parallel copy worker's code.
+ *
+ * Where clause handling, convert tuple to columns, add default null values for
+ * the missing columns that are not present in that record. Find the partition
+ * if it is partitioned table, invoke before row insert Triggers, handle
+ * constraints and insert the tuples.
+ */
+void
+ParallelCopyMain(dsm_segment *seg, shm_toc *toc)
+{
+	CopyState cstate;
+	ParallelCopyData *pcdata;
+	ParallelCopyShmInfo *pcshared_info;
+	ParallelCopyCommonKeyData *shared_cstate;
+	Relation	rel = NULL;
+	MemoryContext oldcontext;
+	List *attlist = NIL;
+	char *data_source_cb;
+	char *whereClauseStr = NULL;
+	char *rangeTableStr = NULL;
+
+	/* Allocate workspace and zero all fields. */
+	cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+	pcdata = (ParallelCopyData *) palloc0(sizeof(ParallelCopyData));
+	cstate->pcdata = pcdata;
+	pcdata->is_leader = false;
+	pcdata->worker_processed_pos = -1;
+	cstate->is_parallel = true;
+
+	/*
+	 * We allocate everything used by a cstate in a new memory context. This
+	 * avoids memory leaks during repeated use of COPY in a query.
+	 */
+	cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
+												"COPY",
+												ALLOCSET_DEFAULT_SIZES);
+	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+	pcshared_info = (ParallelCopyShmInfo *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_SHARED_INFO,
+														false);
+
+	ereport(DEBUG1, (errmsg("Starting parallel copy worker")));
+
+	pcdata->pcshared_info = pcshared_info;
+
+	shared_cstate = (ParallelCopyCommonKeyData *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_CSTATE,
+														   false);
+
+	cstate->null_print = (char *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT,
+												true);
+	cstate->null_print_client = (char *)shm_toc_lookup(toc, PARALLEL_COPY_KEY_NULL_PRINT_CLIENT,
+													   true);
+
+	RetrieveSharedString(toc, PARALLEL_COPY_KEY_DELIM, &cstate->delim);
+	RetrieveSharedString(toc, PARALLEL_COPY_KEY_QUOTE, &cstate->quote);
+	RetrieveSharedString(toc, PARALLEL_COPY_KEY_ESCAPE, &cstate->escape);
+
+	RetrieveSharedList(toc, PARALLEL_COPY_KEY_ATTNAME_LIST, &attlist);
+	RetrieveSharedList(toc, PARALLEL_COPY_KEY_FORCE_QUOTE_LIST, &cstate->force_quote);
+	RetrieveSharedList(toc, PARALLEL_COPY_KEY_NOT_NULL_LIST, &cstate->force_notnull);
+	RetrieveSharedList(toc, PARALLEL_COPY_KEY_NULL_LIST, &cstate->force_null);
+	RetrieveSharedList(toc, PARALLEL_COPY_KEY_CONVERT_LIST, &cstate->convert_select);
+	data_source_cb = (char *)shm_toc_lookup(toc,
+											PARALLEL_COPY_KEY_DATASOURCE_CB,
+											true);
+	RetrieveSharedString(toc, PARALLEL_COPY_KEY_WHERE_CLAUSE_STR, &whereClauseStr);
+	RetrieveSharedString(toc, PARALLEL_COPY_KEY_RANGE_TABLE, &rangeTableStr);
+
+	if (data_source_cb)
+		cstate->data_source_cb = LookupParallelCopyFnPtr(data_source_cb);
+
+	if (whereClauseStr)
+	{
+		Node *whereClauseCnvrtdFrmStr = (Node *) stringToNode(whereClauseStr);
+		cstate->whereClause = whereClauseCnvrtdFrmStr;
+	}
+
+	if (rangeTableStr)
+	{
+		List *rangeTableCnvrtdFrmStr = (List *) stringToNode(rangeTableStr);
+		cstate->range_table = rangeTableCnvrtdFrmStr;
+	}
+
+	/* Open and lock the relation, using the appropriate lock type. */
+	rel = table_open(shared_cstate->relid, RowExclusiveLock);
+	cstate->rel = rel;
+	ParallelWorkerInitialization(shared_cstate, cstate, attlist);
+
+	CopyFrom(cstate);
+
+	if (rel != NULL)
+		table_close(rel, RowExclusiveLock);
+
+	MemoryContextSwitchTo(oldcontext);
+	return;
+}
+/*
+ * ParallelCopyLeader - parallel copy leader's functionality.
+ *
+ * Leader will populate the shared queue and share it across the workers. Leader
+ * will read the table data from the file and copy the contents to block. Leader
+ * will then read the input contents and identify the data based on line beaks.
+ * This information is called chunk. The chunk will be populate in
+ * ParallelCopyChunkBoundary. Workers will then pick up this information and insert
+ * in to table. Leader will do this till it completes processing the file.
+ * Leader executes the before statement if before statement trigger is present.
+ * Leader read the data from input file. Leader then loads data to data blocks
+ * as and when required block by block. Leader traverses through the data block
+ * to identify one chunk. It gets a free chunk to copy the information, if there
+ * is no free chunk it will wait till there is one free chunk.
+ * Server copies  the identified chunks information into chunks. This process is
+ * repeated till the complete file is processed.
+ * Leader will wait till all the chunks populated are processed by the workers
+ * and exits.
+ */
+static void
+ParallelCopyLeader(CopyState cstate)
+{
+	ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
+	ereport(DEBUG1, (errmsg("Running parallel copy leader")));
+
+	pcshared_info->is_read_in_progress = false;
+	cstate->cur_lineno = 0;
+}
+/*
+ * LookupParallelCopyFnPtr - Look up parallel copy function pointer.
+ */
+static pg_attribute_always_inline copy_data_source_cb
+LookupParallelCopyFnPtr(const char *funcname)
+{
+	int			i;
+
+	for (i = 0; i < lengthof(InternalParallelCopyFuncPtrs); i++)
+	{
+		if (strcmp(InternalParallelCopyFuncPtrs[i].fn_name, funcname) == 0)
+			return InternalParallelCopyFuncPtrs[i].fn_addr;
+	}
+
+	/* We can only reach this by programming error. */
+	elog(ERROR, "internal function \"%s\" not found", funcname);
+}
+
+/*
+ * LookupParallelCopyFnStr - Lookup function string from a function pointer.
+ */
+static pg_attribute_always_inline char*
+LookupParallelCopyFnStr(copy_data_source_cb fn_addr)
+{
+	int			i;
+
+	for (i = 0; i < lengthof(InternalParallelCopyFuncPtrs); i++)
+	{
+		if (InternalParallelCopyFuncPtrs[i].fn_addr == fn_addr)
+			return InternalParallelCopyFuncPtrs[i].fn_name;
+	}
+
+	/* We can only reach this by programming error. */
+	elog(ERROR, "internal function pointer not found");
+}
+/*
  * Send copy start/stop messages for frontend copies.  These have changed
  * in past protocol redesigns.
  */
@@ -1146,6 +1917,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 
 	if (is_from)
 	{
+		ParallelContext *pcxt = NULL;
 		Assert(rel);
 
 		/* check read-only transaction and parallel mode */
@@ -1155,7 +1927,24 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
 							   NULL, stmt->attlist, stmt->options);
 		cstate->whereClause = whereClause;
-		*processed = CopyFrom(cstate);	/* copy from file to database */
+		cstate->is_parallel = false;
+
+		if (cstate->nworkers > 0)
+			pcxt = BeginParallelCopy(cstate->nworkers, cstate, stmt->attlist,
+									 relid);
+
+		if (pcxt)
+		{
+			ParallelCopyLeader(cstate);
+
+			/* Wait for all copy workers to finish */
+			WaitForParallelWorkersToFinish(pcxt);
+			*processed = pg_atomic_read_u64(&cstate->pcdata->pcshared_info->processed);
+			EndParallelCopy(pcxt);
+		}
+		else
+			*processed = CopyFrom(cstate); /* copy from file to database */
+
 		EndCopyFrom(cstate);
 	}
 	else
@@ -1204,6 +1993,7 @@ ProcessCopyOptions(ParseState *pstate,
 	cstate->is_copy_from = is_from;
 
 	cstate->file_encoding = -1;
+	cstate->nworkers = -1;
 
 	/* Extract options from the statement node tree */
 	foreach(option, options)
@@ -1372,6 +2162,26 @@ ProcessCopyOptions(ParseState *pstate,
 								defel->defname),
 						 parser_errposition(pstate, defel->location)));
 		}
+		else if (strcmp(defel->defname, "parallel") == 0)
+		{
+			if (!cstate->is_copy_from)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("parallel option supported only for copy from"),
+						 parser_errposition(pstate, defel->location)));
+			if (cstate->nworkers >= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+			cstate->nworkers = atoi(defGetString(defel));
+			if (cstate->nworkers < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("argument to option \"%s\" must be a non-negative integer",
+								defel->defname),
+						 parser_errposition(pstate, defel->location)));
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c27d970..b3787c1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -557,7 +557,7 @@ make_copy_attnamelist(LogicalRepRelMapEntry *rel)
  * Data source callback for the COPY FROM, which reads from the remote
  * connection and passes the data back to our local COPY.
  */
-static int
+int
 copy_read_data(void *outbuf, int minread, int maxread)
 {
 	int			bytesread = 0;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833..5dc95ac 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -14,6 +14,7 @@
 #ifndef COPY_H
 #define COPY_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
@@ -41,4 +42,7 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern void ParallelCopyMain(dsm_segment *seg, shm_toc *toc);
+
+extern int copy_read_data(void *outbuf, int minread, int maxread);
 #endif							/* COPY_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c65a552..3373894 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1698,6 +1698,14 @@ ParallelBitmapHeapState
 ParallelBlockTableScanDesc
 ParallelCompletionPtr
 ParallelContext
+ParallelCopyChunkBoundaries
+ParallelCopyChunkBoundary
+ParallelCopyCommonKeyData
+ParallelCopyData
+ParallelCopyDataBlock
+ParallelCopyKeyListInfo
+ParallelCopyLineBuf
+ParallelCopyShmInfo
 ParallelExecutorInfo
 ParallelHashGrowth
 ParallelHashJoinBatch
-- 
1.8.3.1

From a45985a66ead7f31e6f885b8406da14f261d7021 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Wed, 10 Jun 2020 07:21:10 +0530
Subject: [PATCH 4/4] Documentation for parallel copy.

This patch has the documentation changes for parallel copy.
---
 doc/src/sgml/ref/copy.sgml | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 18189ab..95d349d 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -37,6 +37,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>'
     NULL '<replaceable class="parameter">null_string</replaceable>'
     HEADER [ <replaceable class="parameter">boolean</replaceable> ]
+    PARALLEL <replaceable class="parameter">integer</replaceable>
     QUOTE '<replaceable class="parameter">quote_character</replaceable>'
     ESCAPE '<replaceable class="parameter">escape_character</replaceable>'
     FORCE_QUOTE { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
@@ -275,6 +276,21 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
    </varlistentry>
 
    <varlistentry>
+    <term><literal>PARALLEL</literal></term>
+    <listitem>
+     <para>
+      Perform <command>COPY FROM</command> in parallel using <replaceable
+      class="parameter"> integer</replaceable> background workers.  Please
+      note that it is not guaranteed that the number of parallel workers
+      specified in <replaceable class="parameter">integer</replaceable> will
+      be used during execution.  It is possible for a copy to run with fewer
+      workers than specified, or even with no workers at all.  This option is
+      allowed only in <command>COPY FROM</command>.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
     <term><literal>QUOTE</literal></term>
     <listitem>
      <para>
-- 
1.8.3.1

Reply via email to