commit f86776463383cec57784b4ca54da6bcd15ceca67
Author: mithun <mithun@localhost.localdomain>
Date:   Thu Nov 24 17:28:41 2016 +0530

    Failover-to-next-writable-connection-05

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 0f375bf..99a0173 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -811,7 +811,7 @@ postgresql://localhost/mydb
 postgresql://user@localhost
 postgresql://user:secret@localhost
 postgresql://other@localhost/otherdb?connect_timeout=10&amp;application_name=myapp
-postgresql://host1:123,host2:456/somedb
+postgresql://host1:123,host2:456/somedb?target_session_attrs=any&amp;application_name=myapp
 </programlisting>
     Components of the hierarchical part of the <acronym>URI</acronym> can also
     be given as parameters.  For example:
@@ -1386,6 +1386,16 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
        </para>
       </listitem>
      </varlistentry>
+     <varlistentry id="libpq-connect-target-session-attrs" xreflabel="target_session_attrs">
+      <term><literal>target_session_attrs</literal></term>
+      <listitem>
+       <para>
+        This parameter specifies what kind of connection is requested. Set to
+        <literal>read-write</literal>, to make a writable connection. Else set
+        to <literal>any</literal>. And, the default is <literal>any</literal>.
+      </para>
+      </listitem>
+    </varlistentry>
     </variablelist>
    </para>
   </sect2>
@@ -7069,6 +7079,16 @@ myEventProc(PGEventId evtId, void *evtInfo, void *passThrough)
       linkend="libpq-connect-client-encoding"> connection parameter.
      </para>
     </listitem>
+
+    <listitem>
+     <para>
+      <indexterm>
+       <primary><envar>PGTARGETSESSIONATTRS</envar></primary>
+      </indexterm>
+      <envar>PGTARGETSESSIONATTRS</envar> behaves the same as the <xref
+      linkend="libpq-connect-target-session-attrs"> connection parameter.
+     </para>
+    </listitem>
    </itemizedlist>
   </para>
 
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3e9c45b..0a0df8d 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -108,6 +108,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
 #define DefaultOption	""
 #define DefaultAuthtype		  ""
 #define DefaultPassword		  ""
+#define DefaultTargetSessionAttrs	"any"
 #ifdef USE_SSL
 #define DefaultSSLMode "prefer"
 #else
@@ -300,6 +301,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL,
+		"Target-Session-Attrs", "", 11,	/* sizeof("read-write") = 11 */
+	offsetof(struct pg_conn, target_session_attrs)},
+
 	/* Terminating entry --- MUST BE LAST */
 	{NULL, NULL, NULL, NULL,
 	NULL, NULL, 0}
@@ -336,6 +341,8 @@ static PGconn *makeEmptyPGconn(void);
 static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
 static void freePGconn(PGconn *conn);
 static void closePGconn(PGconn *conn);
+static void release_all_addrinfo(PGconn *conn);
+static void sendTerminateConn(PGconn *conn);
 static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage);
 static PQconninfoOption *parse_connection_string(const char *conninfo,
 						PQExpBuffer errorMessage, bool use_defaults);
@@ -1026,6 +1033,24 @@ connectOptions2(PGconn *conn)
 	}
 
 	/*
+	 * Validate target_session_attrs option.
+	 */
+	if (conn->target_session_attrs)
+	{
+		if (strcmp(conn->target_session_attrs, "any") != 0
+			&& strcmp(conn->target_session_attrs, "read-write") != 0)
+		{
+			conn->status = CONNECTION_BAD;
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("invalid target_session_attrs "
+											"value:\"%s\" should be \"any\" or "
+											"\"read-write\"\n"),
+											conn->target_session_attrs);
+			return false;
+		}
+	}
+
+	/*
 	 * Only if we get this far is it appropriate to try to connect. (We need a
 	 * state flag, rather than just the boolean result of this function, in
 	 * case someone tries to PQreset() the PGconn.)
@@ -1814,6 +1839,7 @@ PQconnectPoll(PGconn *conn)
 			/* Special cases: proceed without waiting. */
 		case CONNECTION_SSL_STARTUP:
 		case CONNECTION_NEEDED:
+		case CONNECTION_CHECK_WRITABLE:
 			break;
 
 		default:
@@ -2752,27 +2778,6 @@ keep_going:						/* We will come back to here until there is
 					goto error_return;
 				}
 
-				/* We can release the address lists now. */
-				if (conn->connhost != NULL)
-				{
-					int		i;
-
-					for (i = 0; i < conn->nconnhost; ++i)
-					{
-						int		family = AF_UNSPEC;
-
-#ifdef HAVE_UNIX_SOCKETS
-						if (conn->connhost[i].type == CHT_UNIX_SOCKET)
-							family = AF_UNIX;
-#endif
-
-						pg_freeaddrinfo_all(family,
-											conn->connhost[i].addrlist);
-						conn->connhost[i].addrlist = NULL;
-					}
-				}
-				conn->addr_cur = NULL;
-
 				/* Fire up post-connection housekeeping if needed */
 				if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
 				{
@@ -2782,7 +2787,24 @@ keep_going:						/* We will come back to here until there is
 					return PGRES_POLLING_WRITING;
 				}
 
-				/* Otherwise, we are open for business! */
+				/*
+				 * If a read-write connection is requisted check for same.
+				 */
+				if (conn->target_session_attrs != NULL &&
+					strcmp(conn->target_session_attrs, "read-write") == 0)
+				{
+					conn->status = CONNECTION_OK;
+					if (!PQsendQuery(conn,
+									 "show transaction_read_only"))
+						goto error_return;
+					conn->status = CONNECTION_CHECK_WRITABLE;
+					return PGRES_POLLING_READING;
+				}
+
+				/* We can release the address lists now. */
+				release_all_addrinfo(conn);
+
+				/* We are open for business! */
 				conn->status = CONNECTION_OK;
 				return PGRES_POLLING_OK;
 			}
@@ -2814,10 +2836,111 @@ keep_going:						/* We will come back to here until there is
 					goto error_return;
 			}
 
+			/*
+			 * If a read-write connection is requisted check for same.
+			 */
+			if (conn->target_session_attrs != NULL &&
+				strcmp(conn->target_session_attrs, "read-write") == 0)
+			{
+				conn->status = CONNECTION_OK;
+				if (!PQsendQuery(conn,
+								 "show transaction_read_only"))
+					goto error_return;
+				conn->status = CONNECTION_CHECK_WRITABLE;
+				return PGRES_POLLING_READING;
+			}
+
+			/* We can release the address lists now. */
+			release_all_addrinfo(conn);
+
 			/* We are open for business! */
 			conn->status = CONNECTION_OK;
 			return PGRES_POLLING_OK;
 
+		case CONNECTION_CHECK_WRITABLE:
+			{
+				conn->status = CONNECTION_OK;
+				if (!PQconsumeInput(conn))
+					goto error_return;
+
+				if (PQisBusy(conn))
+				{
+					conn->status = CONNECTION_CHECK_WRITABLE;
+					return PGRES_POLLING_READING;
+				}
+
+				res = PQgetResult(conn);
+				if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) &&
+					PQntuples(res) == 1)
+				{
+					char *val;
+					val = PQgetvalue(res, 0 , 0);
+					if (strncmp(val, "on", 2) == 0)
+					{
+						PQclear(res);
+
+						/*
+						 * This is not a writable connection lets try next
+						 * address.
+						 */
+						appendPQExpBuffer(&conn->errorMessage,
+							libpq_gettext("could not make a writable "
+										  "connection to server "
+										  "\"%s:%s\"\n"),
+										  conn->connhost[conn->whichhost].host,
+										  conn->connhost[conn->whichhost].port);
+						conn->status = CONNECTION_OK;
+						sendTerminateConn(conn);
+						pqDropConnection(conn, true);
+
+						if (conn->addr_cur->ai_next != NULL ||
+							conn->whichhost + 1 < conn->nconnhost)
+						{
+							conn->addr_cur = conn->addr_cur->ai_next;
+							conn->status = CONNECTION_NEEDED;
+							goto keep_going;
+						}
+
+						/* No more addresses to try. So we fail. */
+						goto error_return;
+					}
+					PQclear(res);
+
+					/* We can release the address lists now. */
+					release_all_addrinfo(conn);
+
+					/* We are open for business! */
+					conn->status = CONNECTION_OK;
+						return PGRES_POLLING_OK;
+				}
+
+				/*
+				 * Something went wrong with
+				 * "show transaction_read_only". We should try next addresses.
+				 */
+				if (res)
+					PQclear(res);
+				appendPQExpBuffer(&conn->errorMessage,
+					libpq_gettext("test \"show transaction_read_only\" failed "
+								  " on \"%s:%s\" \n"),
+								  conn->connhost[conn->whichhost].host,
+								  conn->connhost[conn->whichhost].port);
+				conn->status = CONNECTION_OK;
+				sendTerminateConn(conn);
+				pqDropConnection(conn, true);
+
+				if (conn->addr_cur->ai_next != NULL ||
+					conn->whichhost + 1 < conn->nconnhost)
+				{
+					conn->addr_cur = conn->addr_cur->ai_next;
+					conn->status = CONNECTION_NEEDED;
+					goto keep_going;
+				}
+
+				/* No more addresses to try. So we fail. */
+				goto error_return;
+			}
+
 		default:
 			appendPQExpBuffer(&conn->errorMessage,
 							  libpq_gettext("invalid connection state %d, "
@@ -3109,6 +3232,8 @@ freePGconn(PGconn *conn)
 		free(conn->outBuffer);
 	if (conn->rowBuf)
 		free(conn->rowBuf);
+	if (conn->target_session_attrs)
+		free(conn->target_session_attrs);
 	termPQExpBuffer(&conn->errorMessage);
 	termPQExpBuffer(&conn->workBuffer);
 
@@ -3120,19 +3245,41 @@ freePGconn(PGconn *conn)
 }
 
 /*
- * closePGconn
- *	 - properly close a connection to the backend
- *
- * This should reset or release all transient state, but NOT the connection
- * parameters.  On exit, the PGconn should be in condition to start a fresh
- * connection with the same parameters (see PQreset()).
+ * release_all_addrinfo
+ *   - free addrinfo of all hostconn elements.
  */
+
 static void
-closePGconn(PGconn *conn)
+release_all_addrinfo(PGconn *conn)
 {
-	PGnotify   *notify;
-	pgParameterStatus *pstatus;
+	if (conn->connhost != NULL)
+	{
+		int		i;
+
+		for (i = 0; i < conn->nconnhost; ++i)
+		{
+			int		family = AF_UNSPEC;
+
+#ifdef HAVE_UNIX_SOCKETS
+			if (conn->connhost[i].type == CHT_UNIX_SOCKET)
+				family = AF_UNIX;
+#endif
+
+			pg_freeaddrinfo_all(family,
+								conn->connhost[i].addrlist);
+			conn->connhost[i].addrlist = NULL;
+		}
+	}
+	conn->addr_cur = NULL;
+}
 
+/*
+ * sendTerminateConn
+ * 	 - Send a terminate message to backend.
+ */
+static void
+sendTerminateConn(PGconn *conn)
+{
 	/*
 	 * Note that the protocol doesn't allow us to send Terminate messages
 	 * during the startup phase.
@@ -3147,6 +3294,23 @@ closePGconn(PGconn *conn)
 		pqPutMsgEnd(conn);
 		(void) pqFlush(conn);
 	}
+}
+
+/*
+ * closePGconn
+ *	 - properly close a connection to the backend
+ *
+ * This should reset or release all transient state, but NOT the connection
+ * parameters.  On exit, the PGconn should be in condition to start a fresh
+ * connection with the same parameters (see PQreset()).
+ */
+static void
+closePGconn(PGconn *conn)
+{
+	PGnotify   *notify;
+	pgParameterStatus *pstatus;
+
+	sendTerminateConn(conn);
 
 	/*
 	 * Must reset the blocking status so a possible reconnect will work.
@@ -3165,25 +3329,8 @@ closePGconn(PGconn *conn)
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
 	resetPQExpBuffer(&conn->errorMessage);
-	if (conn->connhost != NULL)
-	{
-		int		i;
-
-		for (i = 0; i < conn->nconnhost; ++i)
-		{
-			int		family = AF_UNSPEC;
+	release_all_addrinfo(conn);
 
-#ifdef HAVE_UNIX_SOCKETS
-			if (conn->connhost[i].type == CHT_UNIX_SOCKET)
-				family = AF_UNIX;
-#endif
-
-			pg_freeaddrinfo_all(family,
-								conn->connhost[i].addrlist);
-			conn->connhost[i].addrlist = NULL;
-		}
-	}
-	conn->addr_cur = NULL;
 	notify = conn->notifyHead;
 	while (notify != NULL)
 	{
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 9ca0756..20b7e57 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -62,7 +62,9 @@ typedef enum
 								 * backend startup. */
 	CONNECTION_SETENV,			/* Negotiating environment. */
 	CONNECTION_SSL_STARTUP,		/* Negotiating SSL. */
-	CONNECTION_NEEDED			/* Internal state: connect() needed */
+	CONNECTION_NEEDED,			/* Internal state: connect() needed */
+	CONNECTION_CHECK_WRITABLE	/* Check if we could make a writable
+								 * connection. */
 } ConnStatusType;
 
 typedef enum
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 854ec89..a2f8589 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -361,6 +361,9 @@ struct pg_conn
 	char	   *krbsrvname;		/* Kerberos service name */
 #endif
 
+	char	   *target_session_attrs;	/* Type of connection to make
+										 * Possible values any, read-write. */
+
 	/* Optional file to write trace info to */
 	FILE	   *Pfdebug;
 
