This patch enables dblink to be used for the purpose of efficient
bulk-loading via COPY and libpq in combination with the COPY TO
FUNCTION patch.

The following functions were added to accomplish this:

dblink_connection_reset: useful when handling errors and one just
wants to restore a connection to a known state, rolling back as many
transactions as necessary.

dblink_copy_end: completes the COPY

dblink_copy_open: puts a connection into the COPY state.  Accepts
connection name, relation name, and binary mode flag.

dblink_copy_write: writes a row to the last connection put in the COPY
state by dblink_copy_open.

Generally speaking, code that uses this will look like the following
(presuming a named connection has already been made):

    try:
        SELECT dblink_copy_open('myconn', 'relation_name', true);
        COPY bar TO FUNCTION dblink_copy_write;

        -- since the dblink connection is still in the COPY state, one
        -- can even copy some more data in multiple steps...
        COPY bar_2 TO FUNCTION dblink_copy_write;

        SELECT dblink_copy_end();
    finally:
        SELECT dblink_copy_reset('myconn');

Signed-off-by: Daniel Farina <dfar...@truviso.com>
---
 contrib/dblink/dblink.c             |  190 +++++++++++++++++++++++++++++++++++
 contrib/dblink/dblink.h             |    5 +
 contrib/dblink/dblink.sql.in        |   20 ++++
 contrib/dblink/uninstall_dblink.sql |    8 ++
 4 files changed, 223 insertions(+), 0 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 72fdf56..d32aeec 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1722,6 +1722,196 @@ dblink_get_notify(PG_FUNCTION_ARGS)
  * internal functions
  */
 
+/*
+ * Attempts to take the connection into a known state by rolling back
+ * transactions.  If unable to restore the connection to a known idle state,
+ * raises an error.
+ */
+PG_FUNCTION_INFO_V1(dblink_connection_reset);
+Datum
+dblink_connection_reset(PG_FUNCTION_ARGS)
+{
+       PGresult        *res       = NULL;
+       PGconn          *conn      = NULL;
+       char            *conname   = NULL;
+       remoteConn      *rconn     = NULL;
+
+       bool             triedonce = false;
+
+       DBLINK_INIT;
+
+       /* must be text */
+       Assert(PG_NARGS() == 1);
+       DBLINK_GET_NAMED_CONN;
+
+       if (!conn)
+               DBLINK_CONN_NOT_AVAIL;
+
+       while (!triedonce)
+       {
+               switch (PQtransactionStatus(conn))
+               {
+                       case PQTRANS_IDLE:
+                               /* Everything is okay */
+                               goto finish;
+                       case PQTRANS_ACTIVE:
+                       case PQTRANS_INTRANS:
+                       case PQTRANS_INERROR:
+                               res = PQexec(conn, "ROLLBACK;");
+
+                               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_CONNECTION_FAILURE),
+                                                        errmsg("%s: could not 
issue ROLLBACK command",
+                                                                       
PG_FUNCNAME_MACRO)));
+
+                               PQclear(res);
+                               triedonce = true;
+                               break;
+                       case PQTRANS_UNKNOWN:
+                               elog(ERROR, "%s: connection in unknown 
transaction state",
+                                        PG_FUNCNAME_MACRO);
+               }
+       }
+
+finish:
+       PG_RETURN_VOID();
+}
+
+/*
+ * dblink COPY support, procedures and variables
+ */
+static PGconn *dblink_copy_current = NULL;
+
+/*
+ * dblink_copy_open
+ *
+ * Start a COPY into a given relation on the named remote connection.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_open);
+Datum
+dblink_copy_open(PG_FUNCTION_ARGS)
+{
+       PGresult   *res = NULL;
+       PGconn     *conn = NULL;
+       char       *conname = NULL;
+       remoteConn *rconn = NULL;
+
+       const char      *copy_stmt       = "COPY %s FROM STDIN%s;";
+       const char      *with_binary = " WITH BINARY";
+       const char      *quoted_remoted_relname;
+       bool             isbinary;
+       int                      snprintf_retcode;
+
+       /*
+        * Should be large enough to contain any formatted output.  Formed by
+        * counting the characters in the static formatting sections plus the
+        * bounded length of identifiers.  Some modest padding was added for
+        * paranoia's sake, although all uses of this buffer are checked for
+        * over-length formats anyway.
+        */
+       char             buf[64 + NAMEDATALEN];
+
+       DBLINK_INIT;
+
+       /* must be text,text,bool */
+       Assert(PG_NARGS() == 3);
+       DBLINK_GET_NAMED_CONN;
+
+       if (!conn)
+               DBLINK_CONN_NOT_AVAIL;
+
+       /* Read the procedure arguments into primitive values */
+       quoted_remoted_relname = NameListToQuotedString(
+               textToQualifiedNameList(PG_GETARG_TEXT_P(1)));
+       isbinary = PG_GETARG_BOOL(2);
+
+       /*
+        * Query parameterization only handles value-parameters -- of which
+        * identifiers are not considered one of -- so format the string the old
+        * fashioned way.  It is very important to quote identifiers for this
+        * reason, as performed previously.
+        */
+       snprintf_retcode = snprintf(buf, sizeof buf, copy_stmt,
+                                                               
quoted_remoted_relname,
+                                                               isbinary ? 
with_binary : "");
+
+       if (snprintf_retcode < 0)
+               elog(ERROR, "could not format dblink COPY query");
+       else if (snprintf_retcode >= sizeof buf)
+               /*
+                * Should not be able to happen, see documentation of the "buf" 
value
+                * in this procedure.
+                */
+               elog(ERROR, "could not fit formatted dblink COPY query into 
buffer");
+
+       /*
+        * Run the created query, and check to ensure that PGRES_COPY_IN state 
has
+        * been achieved.
+        */
+       res = PQexec(conn, buf);
+       if (!res || PQresultStatus(res) != PGRES_COPY_IN)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_FAILURE),
+                                errmsg("could not start COPY FROM on remote 
node")));
+       PQclear(res);
+
+       /*
+        * Everything went well; finally bind the global dblink_copy_current to 
the
+        * connection ready to accept copy data.
+        */
+       dblink_copy_current = conn;
+       PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
+
+/*
+ * dblink_copy_write
+ *
+ * Write the provided StringInfo to the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_write);
+Datum
+dblink_copy_write(PG_FUNCTION_ARGS)
+{
+       StringInfo copybuf = (void *) PG_GETARG_POINTER(0);
+
+       if (PQputCopyData(dblink_copy_current, copybuf->data, copybuf->len) != 
1)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_EXCEPTION),
+                                errmsg("could not send row to remote node")));
+
+       PG_RETURN_VOID();
+}
+
+/*
+ * dblink_copy_end
+ *
+ * Finish the currently open COPY.
+ */
+PG_FUNCTION_INFO_V1(dblink_copy_end);
+Datum
+dblink_copy_end(PG_FUNCTION_ARGS)
+{
+       PGresult   *res;
+
+       /* Check to ensure that termination data was sent successfully */
+       if (PQputCopyEnd(dblink_copy_current, NULL) != 1)
+               elog(ERROR, "COPY end failed");
+
+       do
+       {
+               res = PQgetResult(dblink_copy_current);
+               if (res == NULL)
+                       break;
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       elog(ERROR, "COPY failed: %s",
+                                PQerrorMessage(dblink_copy_current));
+               PQclear(res);
+       } while (true);
+
+       dblink_copy_current = NULL;
+       PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
 
 /*
  * get_pkey_attnames
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
index 255f5d0..8a2faee 100644
--- a/contrib/dblink/dblink.h
+++ b/contrib/dblink/dblink.h
@@ -59,4 +59,9 @@ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
 extern Datum dblink_current_query(PG_FUNCTION_ARGS);
 extern Datum dblink_get_notify(PG_FUNCTION_ARGS);
 
+extern Datum dblink_connection_reset(PG_FUNCTION_ARGS);
+
+extern Datum dblink_copy_open(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_write(PG_FUNCTION_ARGS);
+extern Datum dblink_copy_end(PG_FUNCTION_ARGS);
 #endif   /* DBLINK_H */
diff --git a/contrib/dblink/dblink.sql.in b/contrib/dblink/dblink.sql.in
index da5dd65..aedca34 100644
--- a/contrib/dblink/dblink.sql.in
+++ b/contrib/dblink/dblink.sql.in
@@ -221,3 +221,23 @@ CREATE OR REPLACE FUNCTION dblink_get_notify(
 RETURNS setof record
 AS 'MODULE_PATHNAME', 'dblink_get_notify'
 LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_connection_reset (text)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_connection_reset'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_open (text, text, boolean)
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_open'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_write (internal)
+RETURNS void
+AS 'MODULE_PATHNAME','dblink_copy_write'
+LANGUAGE C STRICT;
+
+CREATE OR REPLACE FUNCTION dblink_copy_end ()
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_copy_end'
+LANGUAGE C STRICT;
diff --git a/contrib/dblink/uninstall_dblink.sql 
b/contrib/dblink/uninstall_dblink.sql
index 45cf13c..465beb7 100644
--- a/contrib/dblink/uninstall_dblink.sql
+++ b/contrib/dblink/uninstall_dblink.sql
@@ -11,6 +11,14 @@ DROP FUNCTION dblink_build_sql_delete (text, int2vector, 
int4, _text);
 
 DROP FUNCTION dblink_build_sql_insert (text, int2vector, int4, _text, _text);
 
+DROP FUNCTION dblink_copy_end ();
+
+DROP FUNCTION dblink_copy_open (text, text, boolean);
+
+DROP FUNCTION dblink_copy_write (internal);
+
+DROP FUNCTION dblink_connection_reset (text);
+
 DROP FUNCTION dblink_get_pkey (text);
 
 DROP TYPE dblink_pkey_results;
-- 
1.6.5.3


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to