diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 46c7cc5..ffdf9e3 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -56,11 +56,27 @@
 
 PG_MODULE_MAGIC;
 
+typedef struct storeInfo
+{
+	Tuplestorestate *tuplestore;
+	int nattrs;
+	MemoryContext oldcontext;
+	AttInMetadata *attinmeta;
+	TupleDesc tupdesc;
+	char* valbuf;
+	int valbuflen;
+	char **cstrs;
+	bool error_occurred;
+	bool nummismatch;
+} storeInfo;
+
 typedef struct remoteConn
 {
 	PGconn	   *conn;			/* Hold the remote connection */
 	int			openCursorCount;	/* The number of open cursors */
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
+	bool        materialize_needed; /* Materialize result if true  */
+
 } remoteConn;
 
 /*
@@ -91,11 +107,12 @@ static char *escape_param_str(const char *from);
 static void validate_pkattnums(Relation rel,
 				   int2vector *pkattnums_arg, int32 pknumatts_arg,
 				   int **pkattnums, int *pknumatts);
+static void initStoreInfo(FunctionCallInfo fcinfo, storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
 
 /* Global */
 static remoteConn *pconn = NULL;
 static HTAB *remoteConnHash = NULL;
-
 /*
  *	Following is list that holds multiple remote connections.
  *	Calling convention of each dblink function changes to accept
@@ -112,6 +129,9 @@ typedef struct remoteConnHashEnt
 /* initial number of connection hashes */
 #define NUMCONN 16
 
+/* Initial block size for value buffer in storeHandler */
+#define INITBUFLEN 64
+
 /* general utility */
 #define xpfree(var_) \
 	do { \
@@ -201,6 +221,7 @@ typedef struct remoteConnHashEnt
 				pconn->conn = NULL; \
 				pconn->openCursorCount = 0; \
 				pconn->newXactForCursor = FALSE; \
+				pconn->materialize_needed = false;	\
 			} \
 	} while (0)
 
@@ -229,8 +250,11 @@ dblink_connect(PG_FUNCTION_ARGS)
 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
 	if (connname)
+	{
 		rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
 												  sizeof(remoteConn));
+		rconn->materialize_needed = false;
+	}
 
 	/* first check for valid foreign data server */
 	connstr = get_connect_string(conname_or_str);
@@ -504,6 +528,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	int			howmany = 0;
 	bool		fail = true;	/* default to backward compatible */
+	storeInfo   storeInfo;
 
 	prepTuplestoreResult(fcinfo);
 
@@ -557,15 +582,52 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
 	/*
+	 * Result is stored into storeinfo.tuplestore instead of
+	 * res->result retuned by PQexec below
+	 */
+	initStoreInfo(fcinfo, &storeInfo);
+	PQsetRowProcessor(conn, storeHandler, &storeInfo);
+	
+	/*
 	 * Try to execute the query.  Note that since libpq uses malloc, the
 	 * PGresult will be long-lived even though we are still in a short-lived
 	 * memory context.
 	 */
-	res = PQexec(conn, buf.data);
+	PG_TRY();
+	{
+		res = PQexec(conn, buf.data);
+	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
+
+		PQsetRowProcessor(conn, NULL, NULL);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		/* Skip remaining results when storeHandler raises exception. */
+		PQskipResult(conn, TRUE);
+		ReThrowError(edata);
+	}
+	PG_END_TRY();
+	PQsetRowProcessor(conn, NULL, NULL);
+
 	if (!res ||
 		(PQresultStatus(res) != PGRES_COMMAND_OK &&
 		 PQresultStatus(res) != PGRES_TUPLES_OK))
 	{
+		if (storeInfo.nummismatch)
+		{
+			if (res)
+				PQclear(res);
+
+			/* This is only for backward compatibility */
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
+		}
+
 		dblink_res_error(conname, res, "could not fetch from cursor", fail);
 		return (Datum) 0;
 	}
@@ -577,8 +639,8 @@ dblink_fetch(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_INVALID_CURSOR_NAME),
 				 errmsg("cursor \"%s\" does not exist", curname)));
 	}
+	PQclear(res);
 
-	materializeResult(fcinfo, res);
 	return (Datum) 0;
 }
 
@@ -616,6 +678,8 @@ dblink_send_query(PG_FUNCTION_ARGS)
 	if (retval != 1)
 		elog(NOTICE, "%s", PQerrorMessage(conn));
 
+	rconn->materialize_needed = false;
+	
 	PG_RETURN_INT32(retval);
 }
 
@@ -638,11 +702,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible */
 	bool		freeconn = false;
-
-	prepTuplestoreResult(fcinfo);
+	storeInfo   storeInfo;
 
 	DBLINK_INIT;
 
+	prepTuplestoreResult(fcinfo);
+
 	if (!is_async)
 	{
 		if (PG_NARGS() == 3)
@@ -698,31 +763,97 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 
 	if (!conn)
 		DBLINK_CONN_NOT_AVAIL;
-
-	/* synchronous query, or async result retrieval */
-	if (!is_async)
-		res = PQexec(conn, sql);
+	
+	if (!is_async || (rconn && !rconn->materialize_needed))
+	{
+		/*
+		 * Result is stored into storeinfo.tuplestore instead of
+		 * res->result retuned by PQexec/PQgetResult below
+		 */
+		initStoreInfo(fcinfo, &storeInfo);
+		PQsetRowProcessor(conn, storeHandler, &storeInfo);
+	}
 	else
+		storeInfo.nummismatch = false;
+
+	PG_TRY();
 	{
-		res = PQgetResult(conn);
-		/* NULL means we're all done with the async results */
-		if (!res)
-			return (Datum) 0;
+		/* synchronous query, or async result retrieval */
+		if (!is_async)
+			res = PQexec(conn, sql);
+		else
+			res = PQgetResult(conn);
 	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
 
-	/* if needed, close the connection to the database and cleanup */
-	if (freeconn)
-		PQfinish(conn);
+		PQsetRowProcessor(conn, NULL, NULL);
+		edata = CopyErrorData();
+		FlushErrorState();
 
-	if (!res ||
-		(PQresultStatus(res) != PGRES_COMMAND_OK &&
-		 PQresultStatus(res) != PGRES_TUPLES_OK))
+		/* Skip remaining results when storeHandler raises exception. */
+		PQskipResult(conn, TRUE);
+		ReThrowError(edata);
+	}
+	PG_END_TRY();
+	PQsetRowProcessor(conn, NULL, NULL);
+
+	/* NULL res from async get means we're all done with the results */
+	if (res || !is_async)
 	{
-		dblink_res_error(conname, res, "could not execute query", fail);
-		return (Datum) 0;
+		if (freeconn)
+			PQfinish(conn);
+
+		/*
+		 * exclude mismatch of the numbers of the colums here so as to
+		 * behave as before.
+		 */
+		if (!res ||
+			(PQresultStatus(res) != PGRES_COMMAND_OK &&
+			 PQresultStatus(res) != PGRES_TUPLES_OK &&
+			 !storeInfo.nummismatch))
+		{
+			dblink_res_error(conname, res, "could not execute query", fail);
+			return (Datum) 0;
+		}
+
+		/*
+		 * Materialize result if command result or materiarize is needed.
+		 * Current libpq and dblink API design does not allow to use row
+		 * processor for asynchronous query when dblink_is_busy is called prior
+		 * to dblink_get_result.
+		 */
+		if (PQresultStatus(res) == PGRES_COMMAND_OK ||
+			(rconn && rconn->materialize_needed))
+		{
+			materializeResult(fcinfo, res);
+			return (Datum) 0;
+		}
+		else if (get_call_result_type(fcinfo, NULL, NULL) == TYPEFUNC_RECORD)
+		{
+			PQclear(res);
+
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+		}
+
+		if (storeInfo.nummismatch)
+		{
+			/* This is only for backward compatibility */
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query result rowtype does not match "
+							"the specified FROM clause rowtype")));
+		}
 	}
 
-	materializeResult(fcinfo, res);
+	if (res)
+		PQclear(res);
+
 	return (Datum) 0;
 }
 
@@ -890,375 +1021,518 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res)
 	PG_END_TRY();
 }
 
-/*
- * List all open dblink connections by name.
- * Returns an array of all connection names.
- * Takes no params
- */
-PG_FUNCTION_INFO_V1(dblink_get_connections);
-Datum
-dblink_get_connections(PG_FUNCTION_ARGS)
-{
-	HASH_SEQ_STATUS status;
-	remoteConnHashEnt *hentry;
-	ArrayBuildState *astate = NULL;
-
-	if (remoteConnHash)
-	{
-		hash_seq_init(&status, remoteConnHash);
-		while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
-		{
-			/* stash away current value */
-			astate = accumArrayResult(astate,
-									  CStringGetTextDatum(hentry->name),
-									  false, TEXTOID, CurrentMemoryContext);
-		}
-	}
-
-	if (astate)
-		PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
-											  CurrentMemoryContext));
-	else
-		PG_RETURN_NULL();
-}
-
-/*
- * Checks if a given remote connection is busy
- *
- * Returns 1 if the connection is busy, 0 otherwise
- * Params:
- *	text connection_name - name of the connection to check
- *
- */
-PG_FUNCTION_INFO_V1(dblink_is_busy);
-Datum
-dblink_is_busy(PG_FUNCTION_ARGS)
-{
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
-
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
-
-	PQconsumeInput(conn);
-	PG_RETURN_INT32(PQisBusy(conn));
-}
-
-/*
- * Cancels a running request on a connection
- *
- * Returns text:
- *	"OK" if the cancel request has been sent correctly,
- *		an error message otherwise
- *
- * Params:
- *	text connection_name - name of the connection to check
- *
- */
-PG_FUNCTION_INFO_V1(dblink_cancel_query);
-Datum
-dblink_cancel_query(PG_FUNCTION_ARGS)
-{
-	int			res = 0;
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
-	PGcancel   *cancel;
-	char		errbuf[256];
-
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
-	cancel = PQgetCancel(conn);
-
-	res = PQcancel(cancel, errbuf, 256);
-	PQfreeCancel(cancel);
-
-	if (res == 1)
-		PG_RETURN_TEXT_P(cstring_to_text("OK"));
-	else
-		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
-}
-
-
-/*
- * Get error message from a connection
- *
- * Returns text:
- *	"OK" if no error, an error message otherwise
- *
- * Params:
- *	text connection_name - name of the connection to check
- *
- */
-PG_FUNCTION_INFO_V1(dblink_error_message);
-Datum
-dblink_error_message(PG_FUNCTION_ARGS)
-{
-	char	   *msg;
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
-
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
-
-	msg = PQerrorMessage(conn);
-	if (msg == NULL || msg[0] == '\0')
-		PG_RETURN_TEXT_P(cstring_to_text("OK"));
-	else
-		PG_RETURN_TEXT_P(cstring_to_text(msg));
-}
-
-/*
- * Execute an SQL non-SELECT command
- */
-PG_FUNCTION_INFO_V1(dblink_exec);
-Datum
-dblink_exec(PG_FUNCTION_ARGS)
+static void
+initStoreInfo(FunctionCallInfo fcinfo, storeInfo *sinfo)
 {
-	text	   *volatile sql_cmd_status = NULL;
-	PGconn	   *volatile conn = NULL;
-	volatile bool freeconn = false;
-
-	DBLINK_INIT;
-
-	PG_TRY();
-	{
-		char	   *msg;
-		PGresult   *res = NULL;
-		char	   *connstr = NULL;
-		char	   *sql = NULL;
-		char	   *conname = NULL;
-		remoteConn *rconn = NULL;
-		bool		fail = true;	/* default to backward compatible behavior */
-
-		if (PG_NARGS() == 3)
-		{
-			/* must be text,text,bool */
-			DBLINK_GET_CONN;
-			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-			fail = PG_GETARG_BOOL(2);
-		}
-		else if (PG_NARGS() == 2)
-		{
-			/* might be text,text or text,bool */
-			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
-			{
-				conn = pconn->conn;
-				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-				fail = PG_GETARG_BOOL(1);
-			}
-			else
-			{
-				DBLINK_GET_CONN;
-				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
-			}
-		}
-		else if (PG_NARGS() == 1)
-		{
-			/* must be single text argument */
-			conn = pconn->conn;
-			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-		}
-		else
-			/* shouldn't happen */
-			elog(ERROR, "wrong number of arguments");
-
-		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc       tupdesc = NULL;
+	MemoryContext   oldcontext;
 
-		res = PQexec(conn, sql);
-		if (!res ||
-			(PQresultStatus(res) != PGRES_COMMAND_OK &&
-			 PQresultStatus(res) != PGRES_TUPLES_OK))
-		{
-			dblink_res_error(conname, res, "could not execute command", fail);
+	oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
 
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = cstring_to_text("ERROR");
-		}
-		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = cstring_to_text(PQcmdStatus(res));
-			PQclear(res);
-		}
-		else
-		{
-			PQclear(res);
-			ereport(ERROR,
-				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-				   errmsg("statement returning results not allowed")));
-		}
-	}
-	PG_CATCH();
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
 	{
-		/* if needed, close the connection to the database */
-		if (freeconn)
-			PQfinish(conn);
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	/* if needed, close the connection to the database */
-	if (freeconn)
-		PQfinish(conn);
-
-	PG_RETURN_TEXT_P(sql_cmd_status);
-}
-
-
-/*
- * dblink_get_pkey
- *
- * Return list of primary key fields for the supplied relation,
- * or NULL if none exists.
- */
-PG_FUNCTION_INFO_V1(dblink_get_pkey);
-Datum
-dblink_get_pkey(PG_FUNCTION_ARGS)
-{
-	int16		numatts;
-	char	  **results;
-	FuncCallContext *funcctx;
-	int32		call_cntr;
-	int32		max_calls;
-	AttInMetadata *attinmeta;
-	MemoryContext oldcontext;
-
-	/* stuff done only on the first call of the function */
-	if (SRF_IS_FIRSTCALL())
-	{
-		Relation	rel;
-		TupleDesc	tupdesc;
-
-		/* create a function context for cross-call persistence */
-		funcctx = SRF_FIRSTCALL_INIT();
-
-		/*
-		 * switch to memory context appropriate for multiple function calls
-		 */
-		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-		/* open target relation */
-		rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
-
-		/* get the array of attnums */
-		results = get_pkey_attnames(rel, &numatts);
-
-		relation_close(rel, AccessShareLock);
-
-		/*
-		 * need a tuple descriptor representing one INT and one TEXT column
-		 */
-		tupdesc = CreateTemplateTupleDesc(2, false);
-		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
-						   INT4OID, -1, 0);
-		TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
-						   TEXTOID, -1, 0);
-
-		/*
-		 * Generate attribute metadata needed later to produce tuples from raw
-		 * C strings
-		 */
-		attinmeta = TupleDescGetAttInMetadata(tupdesc);
-		funcctx->attinmeta = attinmeta;
-
-		if ((results != NULL) && (numatts > 0))
-		{
-			funcctx->max_calls = numatts;
-
-			/* got results, keep track of them */
-			funcctx->user_fctx = results;
-		}
-		else
-		{
-			/* fast track when no results */
-			MemoryContextSwitchTo(oldcontext);
-			SRF_RETURN_DONE(funcctx);
-		}
-
-		MemoryContextSwitchTo(oldcontext);
+		case TYPEFUNC_COMPOSITE:
+			tupdesc = CreateTupleDescCopy(tupdesc);
+			sinfo->nattrs = tupdesc->natts;
+			break;
+				
+		case TYPEFUNC_RECORD:
+			tupdesc = CreateTemplateTupleDesc(1, false);
+			TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+							   TEXTOID, -1, 0);
+			sinfo->nattrs = 1;
+			break;
+				
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
 	}
 
-	/* stuff done on every call of the function */
-	funcctx = SRF_PERCALL_SETUP();
-
-	/*
-	 * initialize per-call variables
-	 */
-	call_cntr = funcctx->call_cntr;
-	max_calls = funcctx->max_calls;
-
-	results = (char **) funcctx->user_fctx;
-	attinmeta = funcctx->attinmeta;
-
-	if (call_cntr < max_calls)	/* do when there is more left to send */
-	{
-		char	  **values;
-		HeapTuple	tuple;
-		Datum		result;
+	sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+	sinfo->error_occurred = FALSE;
+	sinfo->nummismatch = FALSE;
+	sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+	sinfo->valbuflen = INITBUFLEN;
+	sinfo->valbuf = (char *)palloc(sinfo->valbuflen);
+	sinfo->cstrs = (char **)palloc(sinfo->nattrs * sizeof(char *));
 
-		values = (char **) palloc(2 * sizeof(char *));
-		values[0] = (char *) palloc(12);		/* sign, 10 digits, '\0' */
-
-		sprintf(values[0], "%d", call_cntr + 1);
-
-		values[1] = results[call_cntr];
-
-		/* build the tuple */
-		tuple = BuildTupleFromCStrings(attinmeta, values);
-
-		/* make the tuple into a datum */
-		result = HeapTupleGetDatum(tuple);
+	rsinfo->setResult = sinfo->tuplestore;
+	rsinfo->setDesc = tupdesc;
 
-		SRF_RETURN_NEXT(funcctx, result);
-	}
-	else
-	{
-		/* do when there is no more left */
-		SRF_RETURN_DONE(funcctx);
-	}
+	MemoryContextSwitchTo(oldcontext);
 }
 
-
-/*
- * dblink_build_sql_insert
- *
- * Used to generate an SQL insert statement
- * based on an existing tuple in a local relation.
- * This is useful for selectively replicating data
- * to another server via dblink.
- *
- * API:
- * <relname> - name of local table of interest
- * <pkattnums> - an int2vector of attnums which will be used
- * to identify the local tuple of interest
- * <pknumatts> - number of attnums in pkattnums
- * <src_pkattvals_arry> - text array of key values which will be used
- * to identify the local tuple of interest
- * <tgt_pkattvals_arry> - text array of key values which will be used
- * to build the string for execution remotely. These are substituted
- * for their counterparts in src_pkattvals_arry
- */
-PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
-Datum
-dblink_build_sql_insert(PG_FUNCTION_ARGS)
-{
-	text	   *relname_text = PG_GETARG_TEXT_P(0);
-	int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
-	int32		pknumatts_arg = PG_GETARG_INT32(2);
-	ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
-	ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
-	Relation	rel;
-	int		   *pkattnums;
+ /* Prototype of this function is PQrowProcessor */
+ static int
+ storeHandler(PGresult *res, PGrowValue *columns, void *param)
+ {
+	 storeInfo *sinfo = (storeInfo *)param;
+	 HeapTuple  tuple;
+	 int        newbuflen;
+	 int        fields = PQnfields(res);
+	 int        i;
+	 char       **cstrs = sinfo->cstrs;
+	 char       *pbuf;
+
+	 if (sinfo->error_occurred)
+		 return -1;
+
+	 if (sinfo->nattrs == 0)
+	 {
+		 int i;
+		 TupleDesc tupdesc = CreateTemplateTupleDesc(fields, false);
+
+		 sinfo->nattrs = fields;
+		 for (i = 1 ; i <= fields ; i++)
+			 TupleDescInitEntry(tupdesc, (AttrNumber)i, "hoge",
+								TEXTOID, -1, 0);
+		 sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+		 sinfo->tupdesc = tupdesc;
+	 }
+
+	 if (sinfo->nattrs != fields)
+	 {
+		 sinfo->error_occurred = TRUE;
+		 sinfo->nummismatch = TRUE;
+
+		 /* This error will be processed in dblink_record_internal() */
+		 return -1;
+	 }
+
+	 /*
+	  * value input functions assumes that the input string is
+	  * terminated by zero. We should make the values to be so.
+	  */
+
+	 /*
+	  * The length of the buffer for each field is value length + 1 for
+	  * zero-termination
+	  */
+	 newbuflen = fields;
+	 for(i = 0 ; i < fields ; i++)
+		 newbuflen += columns[i].len;
+
+	 if (newbuflen > sinfo->valbuflen)
+	 {
+		 int tmplen = sinfo->valbuflen * 2;
+		 /*
+		  * Try to (re)allocate in bigger steps to avoid flood of allocations
+		  * on weird data.
+		  */
+		 while (newbuflen > tmplen && tmplen >= 0)
+			 tmplen *= 2;
+
+		 /* Check if the integer was wrap-rounded. */
+		 if (tmplen < 0)
+			 elog(ERROR, "Buffer size for one row exceeds integer limit");
+		 sinfo->valbuf = (char *)repalloc(sinfo->valbuf, tmplen);
+		 sinfo->valbuflen = tmplen;
+	 }
+
+	 pbuf = sinfo->valbuf;
+	 for(i = 0 ; i < fields ; i++)
+	 {
+		 int len = columns[i].len;
+		 if (len < 0)
+			 cstrs[i] = NULL;
+		 else
+		 {
+			 cstrs[i] = pbuf;
+			 memcpy(pbuf, columns[i].value, len);
+			 pbuf += len;
+			 *pbuf++ = '\0';
+		 }
+	 }
+
+	 /*
+	  * These functions may throw exception. It will be caught in
+	  * dblink_record_internal()
+	  */
+	 tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+	 tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+	 return 1;
+ }
+
+ /*
+  * List all open dblink connections by name.
+  * Returns an array of all connection names.
+  * Takes no params
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_connections);
+ Datum
+ dblink_get_connections(PG_FUNCTION_ARGS)
+ {
+	 HASH_SEQ_STATUS status;
+	 remoteConnHashEnt *hentry;
+	 ArrayBuildState *astate = NULL;
+
+	 if (remoteConnHash)
+	 {
+		 hash_seq_init(&status, remoteConnHash);
+		 while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
+		 {
+			 /* stash away current value */
+			 astate = accumArrayResult(astate,
+									   CStringGetTextDatum(hentry->name),
+									   false, TEXTOID, CurrentMemoryContext);
+		 }
+	 }
+
+	 if (astate)
+		 PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+											   CurrentMemoryContext));
+	 else
+		 PG_RETURN_NULL();
+ }
+
+ /*
+  * Checks if a given remote connection is busy
+  *
+  * Returns 1 if the connection is busy, 0 otherwise
+  * Params:
+  *	text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_is_busy);
+ Datum
+ dblink_is_busy(PG_FUNCTION_ARGS)
+ {
+	 char	   *conname = NULL;
+	 PGconn	   *conn = NULL;
+	 remoteConn *rconn = NULL;
+
+	 DBLINK_INIT;
+	 DBLINK_GET_NAMED_CONN;
+	 
+	 /*
+	  * The result will be read by calling dblink_is_busy on current implement.
+	  * This disables to use storeHandler afterwards. Materialize needs return
+	  * type information of dblink_get_result which is not available here.
+	  */
+	 rconn->materialize_needed = true;
+	 
+	 PQconsumeInput(conn);
+	 PG_RETURN_INT32(PQisBusy(conn));
+ }
+
+ /*
+  * Cancels a running request on a connection
+  *
+  * Returns text:
+  *	"OK" if the cancel request has been sent correctly,
+  *		an error message otherwise
+  *
+  * Params:
+  *	text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_cancel_query);
+ Datum
+ dblink_cancel_query(PG_FUNCTION_ARGS)
+ {
+	 int			res = 0;
+	 char	   *conname = NULL;
+	 PGconn	   *conn = NULL;
+	 remoteConn *rconn = NULL;
+	 PGcancel   *cancel;
+	 char		errbuf[256];
+
+	 DBLINK_INIT;
+	 DBLINK_GET_NAMED_CONN;
+	 cancel = PQgetCancel(conn);
+
+	 res = PQcancel(cancel, errbuf, 256);
+	 PQfreeCancel(cancel);
+
+	 if (res == 1)
+		 PG_RETURN_TEXT_P(cstring_to_text("OK"));
+	 else
+		 PG_RETURN_TEXT_P(cstring_to_text(errbuf));
+ }
+
+
+ /*
+  * Get error message from a connection
+  *
+  * Returns text:
+  *	"OK" if no error, an error message otherwise
+  *
+  * Params:
+  *	text connection_name - name of the connection to check
+  *
+  */
+ PG_FUNCTION_INFO_V1(dblink_error_message);
+ Datum
+ dblink_error_message(PG_FUNCTION_ARGS)
+ {
+	 char	   *msg;
+	 char	   *conname = NULL;
+	 PGconn	   *conn = NULL;
+	 remoteConn *rconn = NULL;
+
+	 DBLINK_INIT;
+	 DBLINK_GET_NAMED_CONN;
+
+	 msg = PQerrorMessage(conn);
+	 if (msg == NULL || msg[0] == '\0')
+		 PG_RETURN_TEXT_P(cstring_to_text("OK"));
+	 else
+		 PG_RETURN_TEXT_P(cstring_to_text(msg));
+ }
+
+ /*
+  * Execute an SQL non-SELECT command
+  */
+ PG_FUNCTION_INFO_V1(dblink_exec);
+ Datum
+ dblink_exec(PG_FUNCTION_ARGS)
+ {
+	 text	   *volatile sql_cmd_status = NULL;
+	 PGconn	   *volatile conn = NULL;
+	 volatile bool freeconn = false;
+
+	 DBLINK_INIT;
+
+	 PG_TRY();
+	 {
+		 char	   *msg;
+		 PGresult   *res = NULL;
+		 char	   *connstr = NULL;
+		 char	   *sql = NULL;
+		 char	   *conname = NULL;
+		 remoteConn *rconn = NULL;
+		 bool		fail = true;	/* default to backward compatible behavior */
+
+		 if (PG_NARGS() == 3)
+		 {
+			 /* must be text,text,bool */
+			 DBLINK_GET_CONN;
+			 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+			 fail = PG_GETARG_BOOL(2);
+		 }
+		 else if (PG_NARGS() == 2)
+		 {
+			 /* might be text,text or text,bool */
+			 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+			 {
+				 conn = pconn->conn;
+				 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+				 fail = PG_GETARG_BOOL(1);
+			 }
+			 else
+			 {
+				 DBLINK_GET_CONN;
+				 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+			 }
+		 }
+		 else if (PG_NARGS() == 1)
+		 {
+			 /* must be single text argument */
+			 conn = pconn->conn;
+			 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+		 }
+		 else
+			 /* shouldn't happen */
+			 elog(ERROR, "wrong number of arguments");
+
+		 if (!conn)
+			 DBLINK_CONN_NOT_AVAIL;
+
+		 res = PQexec(conn, sql);
+		 if (!res ||
+			 (PQresultStatus(res) != PGRES_COMMAND_OK &&
+			  PQresultStatus(res) != PGRES_TUPLES_OK))
+		 {
+			 dblink_res_error(conname, res, "could not execute command", fail);
+
+			 /*
+			  * and save a copy of the command status string to return as our
+			  * result tuple
+			  */
+			 sql_cmd_status = cstring_to_text("ERROR");
+		 }
+		 else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+		 {
+			 /*
+			  * and save a copy of the command status string to return as our
+			  * result tuple
+			  */
+			 sql_cmd_status = cstring_to_text(PQcmdStatus(res));
+			 PQclear(res);
+		 }
+		 else
+		 {
+			 PQclear(res);
+			 ereport(ERROR,
+				   (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+					errmsg("statement returning results not allowed")));
+		 }
+	 }
+	 PG_CATCH();
+	 {
+		 /* if needed, close the connection to the database */
+		 if (freeconn)
+			 PQfinish(conn);
+		 PG_RE_THROW();
+	 }
+	 PG_END_TRY();
+
+	 /* if needed, close the connection to the database */
+	 if (freeconn)
+		 PQfinish(conn);
+
+	 PG_RETURN_TEXT_P(sql_cmd_status);
+ }
+
+
+ /*
+  * dblink_get_pkey
+  *
+  * Return list of primary key fields for the supplied relation,
+  * or NULL if none exists.
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_pkey);
+ Datum
+ dblink_get_pkey(PG_FUNCTION_ARGS)
+ {
+	 int16		numatts;
+	 char	  **results;
+	 FuncCallContext *funcctx;
+	 int32		call_cntr;
+	 int32		max_calls;
+	 AttInMetadata *attinmeta;
+	 MemoryContext oldcontext;
+
+	 /* stuff done only on the first call of the function */
+	 if (SRF_IS_FIRSTCALL())
+	 {
+		 Relation	rel;
+		 TupleDesc	tupdesc;
+
+		 /* create a function context for cross-call persistence */
+		 funcctx = SRF_FIRSTCALL_INIT();
+
+		 /*
+		  * switch to memory context appropriate for multiple function calls
+		  */
+		 oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		 /* open target relation */
+		 rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
+
+		 /* get the array of attnums */
+		 results = get_pkey_attnames(rel, &numatts);
+
+		 relation_close(rel, AccessShareLock);
+
+		 /*
+		  * need a tuple descriptor representing one INT and one TEXT column
+		  */
+		 tupdesc = CreateTemplateTupleDesc(2, false);
+		 TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
+							INT4OID, -1, 0);
+		 TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
+							TEXTOID, -1, 0);
+
+		 /*
+		  * Generate attribute metadata needed later to produce tuples from raw
+		  * C strings
+		  */
+		 attinmeta = TupleDescGetAttInMetadata(tupdesc);
+		 funcctx->attinmeta = attinmeta;
+
+		 if ((results != NULL) && (numatts > 0))
+		 {
+			 funcctx->max_calls = numatts;
+
+			 /* got results, keep track of them */
+			 funcctx->user_fctx = results;
+		 }
+		 else
+		 {
+			 /* fast track when no results */
+			 MemoryContextSwitchTo(oldcontext);
+			 SRF_RETURN_DONE(funcctx);
+		 }
+
+		 MemoryContextSwitchTo(oldcontext);
+	 }
+
+	 /* stuff done on every call of the function */
+	 funcctx = SRF_PERCALL_SETUP();
+
+	 /*
+	  * initialize per-call variables
+	  */
+	 call_cntr = funcctx->call_cntr;
+	 max_calls = funcctx->max_calls;
+
+	 results = (char **) funcctx->user_fctx;
+	 attinmeta = funcctx->attinmeta;
+
+	 if (call_cntr < max_calls)	/* do when there is more left to send */
+	 {
+		 char	  **values;
+		 HeapTuple	tuple;
+		 Datum		result;
+
+		 values = (char **) palloc(2 * sizeof(char *));
+		 values[0] = (char *) palloc(12);		/* sign, 10 digits, '\0' */
+
+		 sprintf(values[0], "%d", call_cntr + 1);
+
+		 values[1] = results[call_cntr];
+
+		 /* build the tuple */
+		 tuple = BuildTupleFromCStrings(attinmeta, values);
+
+		 /* make the tuple into a datum */
+		 result = HeapTupleGetDatum(tuple);
+
+		 SRF_RETURN_NEXT(funcctx, result);
+	 }
+	 else
+	 {
+		 /* do when there is no more left */
+		 SRF_RETURN_DONE(funcctx);
+	 }
+ }
+
+
+ /*
+  * dblink_build_sql_insert
+  *
+  * Used to generate an SQL insert statement
+  * based on an existing tuple in a local relation.
+  * This is useful for selectively replicating data
+  * to another server via dblink.
+  *
+  * API:
+  * <relname> - name of local table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the local tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <src_pkattvals_arry> - text array of key values which will be used
+  * to identify the local tuple of interest
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely. These are substituted
+  * for their counterparts in src_pkattvals_arry
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
+ Datum
+ dblink_build_sql_insert(PG_FUNCTION_ARGS)
+ {
+	 text	   *relname_text = PG_GETARG_TEXT_P(0);
+	 int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
+	 int32		pknumatts_arg = PG_GETARG_INT32(2);
+	 ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+	 ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+	 Relation	rel;
+	 int		   *pkattnums;
 	int			pknumatts;
 	char	  **src_pkattvals;
 	char	  **tgt_pkattvals;
