I tried to prepare a patch for these TODO items: - Have COPY return the number of rows loaded/unloaded? - Update [pg_dump and] psql to use the new COPY libpq API.
Added an "uint64 processed" to "struct CopyStateData". It's incremented each time on a tuple send/receive made by CopyTo/CopyFrom. Collected result is added to COPY command's commandTag which can be gathered by PQcmdStatus(). (Also updated PQcmdTuples() to work with it.) When I tried to modify psql to print the COPY's commandTag, I found that its implementation is really disorganized when we still use old COPY commands. Thus replaced old COPY routines with the new ones. (IMHO, modified design for the psql's COPY should work faster than the previous. Because, in this patch, I don't read input one by one with getc(). Just filled the buffer with fgets and made \r search only in the first and last lines.) Regards. -- "We are the middle children of history, raised by television to believe that someday we'll be millionaires and movie stars and rock stars, but we won't. And we're just learning this fact," Tyler said. "So don't fuck with us."
? src/Makefile.global ? src/cscope.out ? src/backend/cscope.out ? src/backend/postgres ? src/backend/catalog/postgres.bki ? src/backend/catalog/postgres.description ? src/backend/commands/cscope.out ? src/backend/executor/cscope.out ? src/backend/libpq/cscope.out ? src/backend/utils/mb/conversion_procs/conversion_create.sql ? src/backend/utils/mb/conversion_procs/ascii_and_mic/libascii_and_mic.so.0.0 ? src/backend/utils/mb/conversion_procs/cyrillic_and_mic/libcyrillic_and_mic.so.0.0 ? src/backend/utils/mb/conversion_procs/euc_cn_and_mic/libeuc_cn_and_mic.so.0.0 ? src/backend/utils/mb/conversion_procs/euc_jp_and_sjis/libeuc_jp_and_sjis.so.0.0 ? src/backend/utils/mb/conversion_procs/euc_kr_and_mic/libeuc_kr_and_mic.so.0.0 ? src/backend/utils/mb/conversion_procs/euc_tw_and_big5/libeuc_tw_and_big5.so.0.0 ? src/backend/utils/mb/conversion_procs/latin2_and_win1250/liblatin2_and_win1250.so.0.0 ? src/backend/utils/mb/conversion_procs/latin_and_mic/liblatin_and_mic.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_ascii/libutf8_and_ascii.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_big5/libutf8_and_big5.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_cyrillic/libutf8_and_cyrillic.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_euc_cn/libutf8_and_euc_cn.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_euc_jp/libutf8_and_euc_jp.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_euc_kr/libutf8_and_euc_kr.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_euc_tw/libutf8_and_euc_tw.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_gb18030/libutf8_and_gb18030.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_gbk/libutf8_and_gbk.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_iso8859/libutf8_and_iso8859.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_iso8859_1/libutf8_and_iso8859_1.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_johab/libutf8_and_johab.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_sjis/libutf8_and_sjis.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_uhc/libutf8_and_uhc.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_win1250/libutf8_and_win1250.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_win1252/libutf8_and_win1252.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_win1256/libutf8_and_win1256.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_win1258/libutf8_and_win1258.so.0.0 ? src/backend/utils/mb/conversion_procs/utf8_and_win874/libutf8_and_win874.so.0.0 ? src/bin/initdb/initdb ? src/bin/ipcclean/ipcclean ? src/bin/pg_config/pg_config ? src/bin/pg_controldata/pg_controldata ? src/bin/pg_ctl/pg_ctl ? src/bin/pg_dump/pg_dump ? src/bin/pg_dump/pg_dumpall ? src/bin/pg_dump/pg_restore ? src/bin/pg_resetxlog/pg_resetxlog ? src/bin/psql/cscope.out ? src/bin/psql/psql ? src/bin/scripts/clusterdb ? src/bin/scripts/createdb ? src/bin/scripts/createlang ? src/bin/scripts/createuser ? src/bin/scripts/dropdb ? src/bin/scripts/droplang ? src/bin/scripts/dropuser ? src/bin/scripts/reindexdb ? src/bin/scripts/vacuumdb ? src/include/pg_config.h ? src/include/stamp-h ? src/interfaces/ecpg/compatlib/libecpg_compat.so.2.2 ? src/interfaces/ecpg/ecpglib/libecpg.so.5.3 ? src/interfaces/ecpg/pgtypeslib/libpgtypes.so.2.2 ? src/interfaces/ecpg/preproc/ecpg ? src/interfaces/libpq/cscope.out ? src/interfaces/libpq/libpq.so.4.2 ? src/pl/plpgsql/src/libplpgsql.so.1.0 ? src/port/pg_config_paths.h ? src/test/regress/libregress.so.0.0 ? src/test/regress/pg_regress ? src/test/regress/expected/constraints.out ? src/test/regress/expected/copy.out ? src/test/regress/expected/create_function_1.out ? src/test/regress/expected/create_function_2.out ? src/test/regress/expected/misc.out ? src/test/regress/expected/tablespace.out ? src/test/regress/sql/constraints.sql ? src/test/regress/sql/copy.sql ? src/test/regress/sql/create_function_1.sql ? src/test/regress/sql/create_function_2.sql ? src/test/regress/sql/misc.sql ? src/test/regress/sql/tablespace.sql ? src/timezone/zic Index: src/backend/commands/copy.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v retrieving revision 1.255 diff -u -c -r1.255 copy.c *** src/backend/commands/copy.c 22 Nov 2005 18:17:08 -0000 1.255 --- src/backend/commands/copy.c 18 Dec 2005 09:48:27 -0000 *************** *** 102,107 **** --- 102,108 ---- int client_encoding; /* remote side's character encoding */ bool need_transcoding; /* client encoding diff from server? */ bool client_only_encoding; /* encoding not valid on server? */ + uint64 processed; /* # of tuples processed */ /* parameters from the COPY command */ Relation rel; /* relation to copy to or from */ *************** *** 646,652 **** * Do not allow the copy if user doesn't have proper permission to access * the table. */ ! void DoCopy(const CopyStmt *stmt) { CopyState cstate; --- 647,653 ---- * Do not allow the copy if user doesn't have proper permission to access * the table. */ ! uint64 DoCopy(const CopyStmt *stmt) { CopyState cstate; *************** *** 660,665 **** --- 661,667 ---- AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); AclResult aclresult; ListCell *option; + uint64 processed; /* Allocate workspace and zero all fields */ cstate = (CopyStateData *) palloc0(sizeof(CopyStateData)); *************** *** 935,941 **** initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); ! cstate->raw_buf_index = cstate->raw_buf_len = 0; /* Set up encoding conversion info */ cstate->client_encoding = pg_get_client_encoding(); --- 937,943 ---- initStringInfo(&cstate->line_buf); cstate->line_buf_converted = false; cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1); ! cstate->raw_buf_index = cstate->raw_buf_len = cstate->processed = 0; /* Set up encoding conversion info */ cstate->client_encoding = pg_get_client_encoding(); *************** *** 1080,1086 **** --- 1082,1091 ---- pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); pfree(cstate->raw_buf); + + processed = cstate->processed; pfree(cstate); + return processed; } *************** *** 1310,1315 **** --- 1315,1322 ---- VARSIZE(outputbytes) - VARHDRSZ); } } + + cstate->processed++; } CopySendEndOfRow(cstate); *************** *** 1916,1921 **** --- 1923,1930 ---- /* AFTER ROW INSERT Triggers */ ExecARInsertTriggers(estate, resultRelInfo, tuple); + + cstate->processed++; } } Index: src/backend/tcop/utility.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/tcop/utility.c,v retrieving revision 1.250 diff -u -c -r1.250 utility.c *** src/backend/tcop/utility.c 29 Nov 2005 01:25:49 -0000 1.250 --- src/backend/tcop/utility.c 18 Dec 2005 09:48:29 -0000 *************** *** 640,646 **** break; case T_CopyStmt: ! DoCopy((CopyStmt *) parsetree); break; case T_PrepareStmt: --- 640,653 ---- break; case T_CopyStmt: ! { ! uint64 processed = DoCopy((CopyStmt *) parsetree); ! char buf[21]; ! ! snprintf(buf, sizeof(buf), UINT64_FORMAT, processed); ! snprintf(completionTag, COMPLETION_TAG_BUFSIZE, ! "COPY %s", buf); ! } break; case T_PrepareStmt: Index: src/bin/psql/common.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/psql/common.c,v retrieving revision 1.111 diff -u -c -r1.111 common.c *** src/bin/psql/common.c 22 Nov 2005 18:17:29 -0000 1.111 --- src/bin/psql/common.c 18 Dec 2005 09:48:31 -0000 *************** *** 844,857 **** * Returns true if the query executed successfully, false otherwise. */ static bool ! ProcessCopyResult(PGresult *results) { ! bool success = false; ! if (!results) return false; ! switch (PQresultStatus(results)) { case PGRES_TUPLES_OK: case PGRES_COMMAND_OK: --- 844,859 ---- * Returns true if the query executed successfully, false otherwise. */ static bool ! ProcessCopyResult(PGresult **results) { ! bool success, req_getres; ! if (!*results) return false; ! success = req_getres = false; ! ! switch (PQresultStatus(*results)) { case PGRES_TUPLES_OK: case PGRES_COMMAND_OK: *************** *** 861,871 **** break; case PGRES_COPY_OUT: ! success = handleCopyOut(pset.db, pset.queryFout); break; case PGRES_COPY_IN: ! success = handleCopyIn(pset.db, pset.cur_cmd_source); break; default: --- 863,873 ---- break; case PGRES_COPY_OUT: ! req_getres = success = handleCopyOut(pset.db, pset.queryFout); break; case PGRES_COPY_IN: ! req_getres = success = handleCopyIn(pset.db, pset.cur_cmd_source); break; default: *************** *** 875,880 **** --- 877,894 ---- /* may need this to recover from conn loss during COPY */ if (!CheckConnection()) return false; + + /* + * We need a last PQgetResult() call to learn whether COPY + * succeded or not. + */ + if (req_getres) + { + PQclear(*results); + *results = PQgetResult(pset.db); + if (PQresultStatus(*results) != PGRES_COMMAND_OK) + success = false; + } return success; } *************** *** 1058,1065 **** results = PQexec(pset.db, query); /* these operations are included in the timing result: */ ! OK = (AcceptResult(results, query) && ProcessCopyResult(results)); ! if (pset.timing) GETTIMEOFDAY(&after); --- 1072,1079 ---- results = PQexec(pset.db, query); /* these operations are included in the timing result: */ ! OK = (AcceptResult(results, query) && ProcessCopyResult(&results)); ! if (pset.timing) GETTIMEOFDAY(&after); Index: src/bin/psql/copy.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/bin/psql/copy.c,v retrieving revision 1.58 diff -u -c -r1.58 copy.c *** src/bin/psql/copy.c 15 Oct 2005 02:49:40 -0000 1.58 --- src/bin/psql/copy.c 18 Dec 2005 09:48:32 -0000 *************** *** 585,670 **** return success; } ! #define COPYBUFSIZ 8192 /* size doesn't matter */ ! /* * handleCopyOut * receives data as a result of a COPY ... TO stdout command - * - * If you want to use COPY TO in your application, this is the code to steal :) - * - * conn should be a database connection that you just called COPY TO on - * (and which gave you PGRES_COPY_OUT back); - * copystream is the file stream you want the output to go to */ bool handleCopyOut(PGconn *conn, FILE *copystream) { ! bool copydone = false; /* haven't started yet */ ! char copybuf[COPYBUFSIZ]; ! int ret; ! while (!copydone) { ! ret = PQgetline(conn, copybuf, COPYBUFSIZ); ! if (copybuf[0] == '\\' && ! copybuf[1] == '.' && ! copybuf[2] == '\0') { ! copydone = true; /* we're at the end */ } ! else { ! fputs(copybuf, copystream); ! switch (ret) ! { ! case EOF: ! copydone = true; ! /* FALLTHROUGH */ ! case 0: ! fputc('\n', copystream); ! break; ! case 1: ! break; ! } } } fflush(copystream); - ret = !PQendcopy(conn); ResetCancelConn(); ! return ret; } - - /* * handleCopyIn * receives data as a result of a COPY ... FROM stdin command - * - * Again, if you want to use COPY FROM in your application, copy this. - * - * conn should be a database connection that you just called COPY FROM on - * (and which gave you PGRES_COPY_IN back); - * copystream is the file stream you want the input to come from */ - bool handleCopyIn(PGconn *conn, FILE *copystream) { ! const char *prompt; ! bool copydone = false; ! bool firstload; ! bool linedone; ! bool saw_cr = false; ! char copybuf[COPYBUFSIZ]; ! char *s; ! int bufleft; ! int c = 0; ! int ret; ! unsigned int linecount = 0; /* Prompt if interactive input */ if (isatty(fileno(copystream))) --- 585,659 ---- return success; } + /* + * Function routines for handling COPY IN/OUT input/output. + * + * If you want to use COPY TO in your application, this is the + * code to steal ;) + * + * conn should be a database connection that you just issued COPY FROM/TO + * and copystream is the file stream for input/output to read/go. + * + * PGRES_COPY_IN and PGRES_COPY_OUT results will be untouched, so you + * should make a PQgetResult() call at the end to learn whether COPY + * succeeded or not. (This will bring COPY command's commandTag string too.) + */ ! /* Buffer size for a single line during COPY IN/OUT */ ! #define COPYBUFSIZ 8192 /* * handleCopyOut * receives data as a result of a COPY ... TO stdout command */ bool handleCopyOut(PGconn *conn, FILE *copystream) { ! char *buf; ! int ret; ! bool res = true; ! for (;;) { ! ret = PQgetCopyData(conn, &buf, 0); ! /* Buffer is filled by libpq */ ! if (ret > 0) { ! fputs(buf, copystream); ! PQfreemem(buf); } ! ! /* copy done */ ! else if (ret == -1) ! break; ! ! /* oops */ ! else if (ret == -2) { ! res = false; ! break; } } + fflush(copystream); ResetCancelConn(); ! ! return res; } /* * handleCopyIn * receives data as a result of a COPY ... FROM stdin command */ bool handleCopyIn(PGconn *conn, FILE *copystream) { ! const char *prompt; ! char buf[COPYBUFSIZ]; ! unsigned int l; ! bool first, saw_cr; ! int ret; /* Prompt if interactive input */ if (isatty(fileno(copystream))) *************** *** 677,747 **** else prompt = NULL; ! while (!copydone) ! { /* for each input line ... */ if (prompt) { fputs(prompt, stdout); fflush(stdout); } ! firstload = true; ! linedone = false; ! while (!linedone) ! { /* for each bufferload in line ... */ ! /* Fetch string until \n, EOF, or buffer full */ ! s = copybuf; ! for (bufleft = COPYBUFSIZ - 1; bufleft > 0; bufleft--) ! { ! c = getc(copystream); ! if (c == EOF) ! { ! linedone = true; ! break; ! } ! *s++ = c; ! if (c == '\n') ! { ! linedone = true; ! break; ! } ! if (c == '\r') ! saw_cr = true; ! } ! *s = '\0'; ! /* EOF with empty line-so-far? */ ! if (c == EOF && s == copybuf && firstload) ! { ! /* ! * We are guessing a little bit as to the right line-ending ! * here... ! */ ! if (saw_cr) ! PQputline(conn, "\\.\r\n"); ! else ! PQputline(conn, "\\.\n"); ! copydone = true; ! if (pset.cur_cmd_interactive) ! puts("\\."); ! break; ! } ! /* No, so pass the data to the backend */ ! PQputline(conn, copybuf); ! /* Check for line consisting only of \. */ ! if (firstload) ! { ! if (strcmp(copybuf, "\\.\n") == 0 || ! strcmp(copybuf, "\\.\r\n") == 0) ! { ! copydone = true; ! break; ! } ! firstload = false; ! } } ! linecount++; } ! ret = !PQendcopy(conn); ! pset.lineno += linecount; ! return ret; } --- 666,714 ---- else prompt = NULL; ! saw_cr = false; ! first = true; ! for (;;) ! { if (prompt) { fputs(prompt, stdout); fflush(stdout); } ! fgets(buf, COPYBUFSIZ, copystream); ! l = strlen(buf); ! ! if (buf[0] == '\0') ! break; ! else if (strcmp(buf, "\\.\n") == 0 || strcmp(buf, "\\.\r\n") == 0) ! { ! if (buf[2] == '\r') ! saw_cr = true; ! break; ! } ! /* ! * \r will checked just in the first and ! * (as coded above) the last \. line ! */ ! if (first) ! { ! if (strchr(buf, '\r')) ! saw_cr = true; ! first = false; } ! ! if (PQputCopyData(conn, buf, l) < 0) ! return false; ! ! pset.lineno++; } ! ! ret = (saw_cr) ! ? PQputCopyData(conn, "\\.\r\n", 4) ! : PQputCopyData(conn, "\\.\n", 3); ! if (ret < 0) ! return false; ! ! return (PQputCopyEnd(conn, NULL) < 0) ? false : true; } Index: src/include/commands/copy.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/commands/copy.h,v retrieving revision 1.25 diff -u -c -r1.25 copy.h *** src/include/commands/copy.h 31 Dec 2004 22:03:28 -0000 1.25 --- src/include/commands/copy.h 18 Dec 2005 09:48:32 -0000 *************** *** 16,22 **** #include "nodes/parsenodes.h" ! ! extern void DoCopy(const CopyStmt *stmt); #endif /* COPY_H */ --- 16,21 ---- #include "nodes/parsenodes.h" ! extern uint64 DoCopy(const CopyStmt *stmt); #endif /* COPY_H */ Index: src/interfaces/libpq/fe-exec.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v retrieving revision 1.177 diff -u -c -r1.177 fe-exec.c *** src/interfaces/libpq/fe-exec.c 22 Nov 2005 18:17:32 -0000 1.177 --- src/interfaces/libpq/fe-exec.c 18 Dec 2005 09:48:36 -0000 *************** *** 2177,2183 **** char * PQcmdTuples(PGresult *res) { ! char *p; if (!res) return ""; --- 2177,2183 ---- char * PQcmdTuples(PGresult *res) { ! char *p, *c; if (!res) return ""; *************** *** 2195,2201 **** p = res->cmdStatus + 6; else if (strncmp(res->cmdStatus, "FETCH ", 6) == 0) p = res->cmdStatus + 5; ! else if (strncmp(res->cmdStatus, "MOVE ", 5) == 0) p = res->cmdStatus + 4; else return ""; --- 2195,2202 ---- p = res->cmdStatus + 6; else if (strncmp(res->cmdStatus, "FETCH ", 6) == 0) p = res->cmdStatus + 5; ! else if (strncmp(res->cmdStatus, "MOVE ", 5) == 0 || ! strncmp(res->cmdStatus, "COPY ", 5) == 0) p = res->cmdStatus + 4; else return ""; *************** *** 2203,2216 **** p++; if (*p == 0) ! { ! pqInternalNotice(&res->noticeHooks, ! "could not interpret result from server: %s", ! res->cmdStatus); ! return ""; ! } ! ! return p; } /* --- 2204,2222 ---- p++; if (*p == 0) ! goto interpret_error; ! ! /* check if we have an int */ ! for (c = p; isdigit((int) *c); ++c) ! ; ! if (*c == 0) ! return p; ! ! interpret_error: ! pqInternalNotice(&res->noticeHooks, ! "could not interpret result from server: %s", ! res->cmdStatus); ! return ""; } /* Index: doc/src/sgml/libpq.sgml =================================================================== RCS file: /projects/cvsroot/pgsql/doc/src/sgml/libpq.sgml,v retrieving revision 1.199 diff -u -c -r1.199 libpq.sgml *** doc/src/sgml/libpq.sgml 4 Nov 2005 23:14:00 -0000 1.199 --- doc/src/sgml/libpq.sgml 18 Dec 2005 09:49:00 -0000 *************** *** 2129,2144 **** affected by the <acronym>SQL</> statement that generated the <structname>PGresult</>. This function can only be used following the execution of an <command>INSERT</>, ! <command>UPDATE</>, <command>DELETE</>, <command>MOVE</>, or ! <command>FETCH</> statement, or an <command>EXECUTE</> of a ! prepared query that contains a <command>INSERT</>, ! <command>UPDATE</>, or <command>DELETE</> statement. If the ! command that generated the <structname>PGresult</> was ! anything else, <function>PQcmdTuples</> returns the empty ! string. The caller should not free the return value ! directly. It will be freed when the associated ! <structname>PGresult</> handle is passed to ! <function>PQclear</function>. </para> </listitem> </varlistentry> --- 2129,2143 ---- affected by the <acronym>SQL</> statement that generated the <structname>PGresult</>. This function can only be used following the execution of an <command>INSERT</>, ! <command>UPDATE</>, <command>DELETE</>, <command>MOVE</>, ! <command>FETCH</>, or <command>COPY</> statement, or an ! <command>EXECUTE</> of a prepared query that contains a ! <command>INSERT</>, <command>UPDATE</>, <command>DELETE</>, or ! <command>COPY</> statement. If the command that generated the ! <structname>PGresult</> was anything else, <function>PQcmdTuples</> ! returns the empty string. The caller should not free the return ! value directly. It will be freed when the associated ! <structname>PGresult</> handle is passed to <function>PQclear</>. </para> </listitem> </varlistentry>
---------------------------(end of broadcast)--------------------------- TIP 9: In versions below 8.0, the planner will ignore your desire to choose an index scan if your joining column's datatypes do not match