Ok, I have included the patched file.
--
Tatsuo Ishii
SRA OSS, Inc. Japan

> --- On Fri, 3/4/09, Tatsuo Ishii <is...@sraoss.co.jp> wrote:
> > > I'm trying to patch against the pgpool-II-2.2
> > source, either I'm either too much of a cretin to apply
> > it or pool_proto_modules.c has other changes?
> > 
> > The patch was against CVS HEAD. Could you try included new
> > patch? It
> > was generated using diff -u (previous one was diff -c) and
> > seems to be
> > nicely applied to 2.2-stable.
> 
> Hmm, I still get the same when I try to patch with that.
> 
> 
>       
/* -*-pgsql-c-*- */
/*
 * $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.6 2009/01/22 
09:16:37 y-mori Exp $
 * 
 * pgpool: a language independent connection pool server for PostgreSQL 
 * written by Tatsuo Ishii
 *
 * Copyright (c) 2003-2009      PgPool Global Development Group
 *
 * Permission to use, copy, modify, and distribute this software and
 * its documentation for any purpose and without fee is hereby
 * granted, provided that the above copyright notice appear in all
 * copies and that both that copyright notice and this permission
 * notice appear in supporting documentation, and that the name of the
 * author not be used in advertising or publicity pertaining to
 * distribution of the software without specific, written prior
 * permission. The author makes no representations about the
 * suitability of this software for any purpose.  It is provided "as
 * is" without express or implied warranty.
 *
 *---------------------------------------------------------------------
 * pool_proto_modules.c: modules corresponding to message protocols.
 * used by pool_process_query()
 *---------------------------------------------------------------------
 */
#include "config.h"
#include <errno.h>

#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif


#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <ctype.h>

#include "pool.h"
#include "pool_signal.h"
#include "pool_proto_modules.h"

int force_replication;
int replication_was_enabled;            /* replication mode was enabled */
int master_slave_was_enabled;   /* master/slave mode was enabled */
int internal_transaction_started;               /* to issue table lock command 
a transaction
                                                                                
                   has been started internally */
int in_progress = 0;            /* indicates while doing something after 
receiving Query */
int mismatch_ntuples;   /* number of updated tuples */
char *copy_table = NULL;  /* copy table name */
char *copy_schema = NULL;  /* copy table name */
char copy_delimiter; /* copy delimiter char */
char *copy_null = NULL; /* copy null string */
void (*pending_function)(PreparedStatementList *p, Portal *portal) = NULL;
Portal *pending_prepared_portal = NULL;
Portal *unnamed_statement = NULL;
Portal *unnamed_portal = NULL;
int select_in_transaction = 0; /* non 0 if select query is in transaction */
int execute_select = 0; /* non 0 if select query is in transaction */

/* non 0 if "BEGIN" query with extended query protocol received */
int receive_extended_begin = 0;
/* non 0 if allow to close internal transaction */
int allow_close_transaction = 1;

PreparedStatementList prepared_list; /* prepared statement name list */

int is_select_pgcatalog = 0;
int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR UPDATE */
bool is_parallel_table = false;

/*
 * last query string sent to simpleQuery()
 */
char query_string_buffer[QUERY_STRING_BUFFER_LEN];

/*
 * query string produced by nodeToString() in simpleQuery().
 * this variable only usefull when enable_query_cache is true.
 */
char *parsed_query = NULL;

POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, 
                                                                                
POOL_CONNECTION_POOL *backend)
{
        int pid, pid1;
        char *condition, *condition1 = NULL;
        int len, len1 = 0;
        int i;
        POOL_STATUS status;

        pool_write(frontend, "A", 1);

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        if (pool_read(CONNECTION(backend, i), &pid, 
sizeof(pid)) < 0)
                                return POOL_ERROR;
                        condition = pool_read_string(CONNECTION(backend, i), 
&len, 0);
                        if (condition == NULL)
                                return POOL_END;

                        if (IS_MASTER_NODE_ID(i))
                        {
                                pid1 = pid;
                                len1 = len;
                                condition1 = strdup(condition);
                        }
                }
        }

        pool_write(frontend, &pid1, sizeof(pid1));
        status = pool_write_and_flush(frontend, condition1, len1);
        free(condition1);
        return status;
}

/*
 * Process Query('Q') message
 * Query messages include a SQL string.
 */
 POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend, 
                                                 POOL_CONNECTION_POOL *backend, 
char *query)
{
        char *string, *string1;
        int len;
        static char *sq = "show pool_status";
        int i, commit;
        List *parse_tree_list;
        Node *node = NULL, *node1;
        POOL_STATUS status;
        int serialization_error_detected = 0;
        int deadlock_detected = 0;
        int     active_sql_transaction_error = 0;

        POOL_MEMORY_POOL *old_context = NULL;
        Portal *portal;

        force_replication = 0;
        if (query == NULL)      /* need to read query from frontend? */
        {
                /* read actual query */
                if (MAJOR(backend) == PROTO_MAJOR_V3)
                {
                        if (pool_read(frontend, &len, sizeof(len)) < 0)
                                return POOL_END;
                        len = ntohl(len) - 4;
                        string = pool_read2(frontend, len);
                }
                else
                        string = pool_read_string(frontend, &len, 0);

                if (string == NULL)
                        return POOL_END;
        }
        else
        {
                len = strlen(query)+1;
                string = query;
        }

        /* save last query string for logging purpose */
        strncpy(query_string_buffer, string, sizeof(query_string_buffer));

        /* show ps status */
        query_ps_status(string, backend);

        /* log query to log file if necessary */
        if (pool_config->log_statement)
        {
                pool_log("statement: %s", string);
        }
        else
        {
                pool_debug("statement2: %s", string);
        }

        /* parse SQL string */
        parse_tree_list = raw_parser(string);

        if (parse_tree_list != NIL)
        {
                node = (Node *) lfirst(list_head(parse_tree_list));

                if (PARALLEL_MODE)
                        is_parallel_table = is_partition_table(backend,node);

                if (pool_config->enable_query_cache &&
                        SYSDB_STATUS == CON_UP &&
                        IsA(node, SelectStmt) &&
                        !(is_select_pgcatalog = IsSelectpgcatalog(node, 
backend)))
                {
                        SelectStmt *select = (SelectStmt *)node;

                        if (! (select->intoClause || select->lockingClause))
                        {
                                parsed_query = strdup(nodeToString(node));
                                if (parsed_query == NULL)
                                {
                                        pool_error("pool_process_query: malloc 
failed");
                                        return POOL_ERROR;
                                }

                                if (parsed_query)
                                {
                                        if (pool_query_cache_lookup(frontend, 
parsed_query, backend->info->database, TSTATE(backend)) == POOL_CONTINUE)
                                        {
                                                free(parsed_query);
                                                parsed_query = NULL;
                                                free_parser();
                                                return POOL_CONTINUE;
                                        }
                                }
                                is_select_for_update = 0;
                        }
                        else
                        {
                                is_select_for_update = 1;
                        }
                }

                if (pool_config->parallel_mode)
                {
      /* The Query is analyzed first in a parallel mode(in_parallel_query), 
       * and, next, the Query is rewritten(rewrite_query_stmt).
       */
 
                        /* analyze the query */
                        RewriteQuery *r_query = is_parallel_query(node,backend);

                        if(r_query->is_loadbalance)
                        {
        /* Usual processing of pgpool is done by using the rewritten Query 
         * if judged a possible load-balancing as a result of analyzing 
         * the Query. 
         * Of course, the load is distributed only for load_balance_mode=true. 
         */
                                if(r_query->r_code ==  SEND_LOADBALANCE_ENGINE)
                                {
                                        /* use rewritten query */
                                        string = r_query->rewrite_query;
                                        /* change query length */
                                        len = strlen(string)+1;
                                }
                                pool_debug("SimpleQuery: loadbalance_query 
=%s",string);
                        }
                        else if (r_query->is_parallel)
                        { 
                                /* 
                                 * For the Query that the parallel processing 
is possible.
                                 * Call parallel exe engine and return status 
to the upper layer.
                                 */
                                POOL_STATUS stats = 
pool_parallel_exec(frontend,backend,r_query->rewrite_query, node,true);
                                free_parser();
                                in_progress = 0;
                                return stats;
                        }
                        else if(!r_query->is_pg_catalog)
                        {
                                /* rewrite query and execute */
                                r_query = 
rewrite_query_stmt(node,frontend,backend,r_query);
                                if(r_query->type == T_InsertStmt)
                                {
                                        free_parser();

                                        if(r_query->r_code != 
INSERT_DIST_NO_RULE) {
                                                in_progress = 0;
                                                return r_query->status;
                                        }
                                }
                                else if(r_query->type == T_SelectStmt)
                                {
                                        free_parser();
                                        in_progress = 0;
                                        return r_query->status;
                                }
                        }
                        /*
                         * The same processing as usual pgpool is done to other 
Query type.
       */
                }

                /* check COPY FROM STDIN
                 * if true, set copy_* variable
                 */
                check_copy_from_stdin(node);

                /*
                 * if this is DROP DATABASE command, send USR1 signal to parent 
and
                 * ask it to close all idle connections.
                 * XXX This is overkill. It would be better to close the idle
                 * connection for the database which DROP DATABASE command tries
                 * to drop. This is impossible at this point, since we have no 
way
                 * to pass such info to other processes.
                 */
                if (is_drop_database(node))
                {
                        int stime = 5;  /* XXX give arbitrary time to allow 
closing idle connections */

                        pool_debug("Query: sending SIGUSR1 signal to parent");

                        Req_info->kind = CLOSE_IDLE_REQUEST;
                        kill(getppid(), SIGUSR1);               /* send USR1 
signal to parent */

                        /* we need to loop over here since we will get USR1 
signal while sleeping */
                        while (stime > 0)
                        {
                                stime = sleep(stime);
                        }
                }

                /* process status reporting? */
                if (IsA(node, VariableShowStmt) && strncasecmp(sq, string, 
strlen(sq)) == 0)
                {
                        StartupPacket *sp;
                        char psbuf[1024];

                        pool_debug("process reporting");
                        process_reporting(frontend, backend);
                        in_progress = 0;

                        /* show ps status */
                        sp = MASTER_CONNECTION(backend)->sp;
                        snprintf(psbuf, sizeof(psbuf), "%s %s %s idle",
                                         sp->user, sp->database, 
remote_ps_data);
                        set_ps_display(psbuf, false);

                        free_parser();
                        return POOL_CONTINUE;
                }

                if (IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
                        IsA(node, VariableSetStmt) || IsA(node, DiscardStmt))
                {
                        /*
                         * PREPARE, DEALLOCATE and SET statements must be 
replicated.
                         */
                        if (MASTER_SLAVE && TSTATE(backend) != 'E')
                                force_replication = 1;

                        /*
                         * Before we do followings only when frontend == NULL,
                         * which was wrong since if, for example, 
reset_query_list
                         * contains "DISCARD ALL", then it does not register
                         * pending function and it causes trying to DEALLOCATE 
non
                         * existing prepared statment(2009/4/3 Tatsuo).
                         */
                        if (IsA(node, PrepareStmt))
                        {
                                pending_function = add_prepared_list;
                                portal = create_portal();
                                if (portal == NULL)
                                {
                                        pool_error("SimpleQuery: 
create_portal() failed");
                                        return POOL_END;
                                }

                                /* switch memory context */
                                old_context = pool_memory;
                                pool_memory = portal->prepare_ctxt;

                                portal->portal_name = NULL;
                                portal->stmt = copyObject(node);
                                portal->sql_string = NULL;
                                pending_prepared_portal = portal;
                        }
                        else if (IsA(node, DeallocateStmt))
                        {
                                pending_function = del_prepared_list;
                                portal = create_portal();
                                if (portal == NULL)
                                {
                                        pool_error("SimpleQuery: 
create_portal() failed");
                                        return POOL_END;
                                }

                                /* switch memory context */
                                old_context = pool_memory;
                                pool_memory = portal->prepare_ctxt;

                                portal->portal_name = NULL;
                                portal->stmt = copyObject(node);
                                portal->sql_string = NULL;
                                pending_prepared_portal = portal;
                        }
                        else if (IsA(node, DiscardStmt))
                        {
                                DiscardStmt *stmt = (DiscardStmt *)node;
                                if (stmt->target == DISCARD_ALL || stmt->target 
== DISCARD_PLANS)
                                {
                                        pending_function = 
delete_all_prepared_list;
                                        pending_prepared_portal = NULL;
                                }
                        }

                        /* switch old memory context */
                        if (old_context)
                                pool_memory = old_context;

                        /* end of wrong if (see 2009/4/3 comment above) */
                }

                if (frontend && IsA(node, ExecuteStmt))
                {
                        Portal *portal;
                        PrepareStmt *p_stmt;
                        ExecuteStmt *e_stmt = (ExecuteStmt *)node;

                        portal = 
lookup_prepared_statement_by_statement(&prepared_list,
                                                                                
                                        e_stmt->name);
                        if (!portal)
                        {
                                string1 = string;
                                node1 = node;
                        }
                        else
                        {
                                p_stmt = (PrepareStmt *)portal->stmt;
                                string1 = nodeToString(p_stmt->query);
                                node1 = (Node *)p_stmt->query;
                        }
                }
                else
                {
                        string1 = string;
                        node1 = node;
                }

                /* load balance trick */
                if (load_balance_enabled(backend, node1, string1))
                        start_load_balance(backend);
                else if (MASTER_SLAVE)
                {
                        pool_debug("SimpleQuery: set master_slave_dml query: 
%s", string);
                        master_slave_was_enabled = 1;
                        MASTER_SLAVE = 0;
                        master_slave_dml = 1;
                        if (force_replication)
                        {
                                replication_was_enabled = 0;
                                REPLICATION = 1;
                        }
                }
                else if (REPLICATION &&
                                 !pool_config->replicate_select &&
                                 is_select_query(node1, string1) &&
                                 !is_sequence_query(node1))
                {
                        selected_slot = MASTER_NODE_ID;
                        replication_was_enabled = 1;
                        REPLICATION = 0;
                        LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
                        in_load_balance = 1;
                        select_in_transaction = 1;
                }


                /*
                 * determine if we need to lock the table
                 * to keep SERIAL data consistency among servers
                 * conditions:
                 * - replication is enabled
                 * - protocol is V3
                 * - statement is INSERT
                 * - either "INSERT LOCK" comment exists or insert_lock 
directive specified
                 */
                if (REPLICATION)
                {
                        /* start a transaction if needed */
                        if (start_internal_transaction(backend, (Node *)node) 
!= POOL_CONTINUE)
                                return POOL_END;

                        /* check if need lock */
                        if (need_insert_lock(backend, string, node))
                        {
                                /* if so, issue lock command */
                                status = insert_lock(backend, string, 
(InsertStmt *)node);
                                if (status != POOL_CONTINUE)
                                {
                                        free_parser();
                                        return status;
                                }
                        }
                }
                else if (REPLICATION && query == NULL && 
start_internal_transaction(backend, node))
                {
                        free_parser();
                        return POOL_ERROR;
                }
        }
        else
        {  /* syntax error */
                if (MASTER_SLAVE)
                {
                        pool_debug("SimpleQuery: set master_slave_dml query: 
%s", string);
                        master_slave_was_enabled = 1;
                        MASTER_SLAVE = 0;
                        master_slave_dml = 1;
                }               
        }

        if (MAJOR(backend) == PROTO_MAJOR_V2 && 
is_start_transaction_query(node))
        {
                TSTATE(backend) = 'T';
        }

        if (REPLICATION || PARALLEL_MODE)
        {
                /* check if query is "COMMIT" */
                commit = is_commit_query(node);
                free_parser();

                /* send query to master node */
                if (!commit)
                {
                        if (send_simplequery_message(MASTER(backend), len, 
string, MAJOR(backend)) != POOL_CONTINUE)
                                return POOL_END;

                        if (wait_for_query_response(MASTER(backend), string) != 
POOL_CONTINUE)
                                return POOL_END;

                        /*
                         * Check dead lock error on the master node and abort
                         * transactions on all nodes if so.
                         */
                        deadlock_detected = 
detect_deadlock_error(MASTER(backend), MAJOR(backend));
                        if (deadlock_detected < 0)
                                return POOL_END;

                        /*
                         * Check serialization failure error and abort
                         * transactions on all nodes if so. Otherwise we allow
                         * data inconsistency among DB nodes. See following
                         * scenario: (M:master, S:slave)
                         *
                         * M:S1:BEGIN;
                         * M:S2:BEGIN;
                         * S:S1:BEGIN;
                         * S:S2:BEGIN;
                         * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * M:S1:UPDATE t1 SET i = i + 1;
                         * S:S1:UPDATE t1 SET i = i + 1;
                         * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
                         * S:S1:COMMIT;
                         * M:S1:COMMIT;
                         * M:S2:ERROR:  could not serialize access due to 
concurrent update
                         * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE 
and data becomes inconsistent!
                         */
                        serialization_error_detected = 
detect_serialization_error(MASTER(backend), MAJOR(backend));
                        if (serialization_error_detected < 0)
                                return POOL_END;

                        /*
                         * check "SET TRANSACTION ISOLATION LEVEL must be 
called before any query" error.
                         * This happens in following scenario:
                         * 
                         * M:S1:BEGIN;
                         * S:S1:BEGIN;
                         * M:S1:SELECT 1; <-- only sent to MASTER
                         * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * M: <-- error
                         * S: <-- ok since no previous SELECT is sent. kind 
mismatch error occurs!
                         */
                        active_sql_transaction_error = 
detect_active_sql_transaction_error(MASTER(backend), MAJOR(backend));
                        if (active_sql_transaction_error < 0)
                                return POOL_END;
                }

                if (deadlock_detected == SPECIFIED_ERROR || 
serialization_error_detected == SPECIFIED_ERROR ||
                        active_sql_transaction_error == SPECIFIED_ERROR)
                {
                        if (deadlock_detected == SPECIFIED_ERROR)
                                pool_log("SimpleQuery: received deadlock error 
message from master node. query: %s", string);
                        else if (serialization_error_detected == 
SPECIFIED_ERROR)
                                pool_log("SimpleQuery: received serialization 
failure error message from master node. query: %s", string);
                        else
                                pool_log("SimpleQuery: received SET TRANSACTION 
ISOLATION LEVEL must be called before any query error. query: %s", string);
                        string = POOL_ERROR_QUERY;
                        len = strlen(string) + 1;
                }

                /* send query to other than master nodes */
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                                continue;

                        if (send_simplequery_message(CONNECTION(backend, i), 
len, string, MAJOR(backend)) != POOL_CONTINUE)
                                return POOL_END;
                }

                /* wait for response except MASTER node */
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                                continue;

                        if (wait_for_query_response(CONNECTION(backend, i), 
string) != POOL_CONTINUE)
                                return POOL_END;
                }

                /* send "COMMIT" to master node if query is "COMMIT" */
                if (commit)
                {
                        if (send_simplequery_message(MASTER(backend), len, 
string, MAJOR(backend)) != POOL_CONTINUE)
                                return POOL_END;

                        if (wait_for_query_response(MASTER(backend), string) != 
POOL_CONTINUE)
                                return POOL_END;

                        TSTATE(backend) = 'I';
                }
        }
        else
        {
                free_parser();
                if (send_simplequery_message(MASTER(backend), len, string, 
MAJOR(backend)) != POOL_CONTINUE)
                        return POOL_END;

                if (wait_for_query_response(MASTER(backend), string) != 
POOL_CONTINUE)
                        return POOL_END;
        }

        return POOL_CONTINUE;
}

/*
 * process EXECUTE (V3 only)
 */
POOL_STATUS Execute(POOL_CONNECTION *frontend, 
                                                   POOL_CONNECTION_POOL 
*backend)
{
        char *string;           /* portal name + null terminate + 
max_tobe_returned_rows */
        int len;
        int i;
        char kind;
        int status, commit = 0;
        Portal *portal;
        char *string1;
        PrepareStmt *p_stmt;
        int deadlock_detected = 0;
        int serialization_error_detected = 0;
        int     active_sql_transaction_error = 0;
        POOL_STATUS ret;

        /* read Execute packet */
        if (pool_read(frontend, &len, sizeof(len)) < 0)
                return POOL_END;

        len = ntohl(len) - 4;
        string = pool_read2(frontend, len);

        pool_debug("Execute: portal name <%s>", string);

        if (receive_extended_begin)
        {
                /* send sync message */
                send_extended_protocol_message(backend, MASTER_NODE_ID, "S", 0, 
"");

                kind = pool_read_kind(backend);
                if (kind != 'Z')
                        return POOL_END;
                if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
                        return POOL_END;
        }

        portal = lookup_prepared_statement_by_portal(&prepared_list,
                                                                                
                 string);

        /* load balance trick */
        if (portal)
        {
                Node *node;

                p_stmt = (PrepareStmt *)portal->stmt;

                string1 = portal->sql_string;
                node = (Node *)p_stmt->query;

                if ((IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) ||
                         IsA(node, VariableSetStmt)) &&
                        MASTER_SLAVE && TSTATE(backend) != 'E')
                {
                        force_replication = 1;
                }
                /*
                 * JDBC driver sends "BEGIN" query internally if 
setAutoCommit(false).
                 * But it does not send Sync message after "BEGIN" query.
                 * In extended query protocol, PostgreSQL returns
                 * ReadyForQuery when a client sends Sync message.
                 * We can't know a transaction state...
                 * So pgpool send Sync message internally.
                 */
                else if (IsA(node, TransactionStmt) && MASTER_SLAVE)
                {
                        TransactionStmt *stmt = (TransactionStmt *) node;

                        if (stmt->kind == TRANS_STMT_BEGIN ||
                                stmt->kind == TRANS_STMT_START)
                                receive_extended_begin = true;
                }

                if (load_balance_enabled(backend, node, string1))
                        start_load_balance(backend);
                else if (REPLICATION &&
                                 !pool_config->replicate_select &&
                                 is_select_query((Node *)p_stmt->query, 
string1) &&
                                 !is_sequence_query((Node *)p_stmt->query))
                {
                        selected_slot = MASTER_NODE_ID;
                        replication_was_enabled = 1;
                        REPLICATION = 0;
                        LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED;
                        in_load_balance = 1;
                        select_in_transaction = 1;
                        execute_select = 1;
                }
/*
                else if (REPLICATION && start_internal_transaction(backend, 
(Node *)p_stmt->query))
                {
                        return POOL_END;
                }
*/
                commit = is_commit_query((Node *)p_stmt->query);
        }

        if (MASTER_SLAVE)
        {
                master_slave_was_enabled = 1;
                MASTER_SLAVE = 0;
                master_slave_dml = 1;
                if (force_replication)
                {
                        replication_was_enabled = 0;
                        REPLICATION = 1;
                }
        }

        if (REPLICATION || PARALLEL_MODE)
        {
                /* send query to master node */
                if (!commit)
                {
                        if (send_execute_message(backend, MASTER_NODE_ID, len, 
string) != POOL_CONTINUE)
                                return POOL_END;

                        pool_debug("waiting for backend completing the query");
                        if (synchronize(CONNECTION(backend, MASTER_NODE_ID)))
                                return POOL_END;

                        /*
                         * Check dead lock error on the master node and abort
                         * transactions on all nodes if so.
                         */
                        deadlock_detected = 
detect_deadlock_error(MASTER(backend), MAJOR(backend));
                        if (deadlock_detected < 0)
                                return POOL_END;

                        /*
                         * Check serialization failure error and abort all nodes
                         * if so. Otherwise we allow data inconsistency among DB
                         * nodes. See following scenario: (M:master, S:slave)
                         */
                        serialization_error_detected = 
detect_serialization_error(MASTER(backend), MAJOR(backend));
                        if (serialization_error_detected < 0)
                                return POOL_END;

                        /*
                         * check "SET TRANSACTION ISOLATION LEVEL must be 
called before any query" error.
                         * This happens in following scenario:
                         * 
                         * M:S1:BEGIN;
                         * S:S1:BEGIN;
                         * M:S1:SELECT 1; <-- only sent to MASTER
                         * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
                         * M: <-- error
                         * S: <-- ok since no previous SELECT is sent. kind 
mismatch error occurs!
                         */
                        active_sql_transaction_error = 
detect_active_sql_transaction_error(MASTER(backend), MAJOR(backend));
                        if (active_sql_transaction_error < 0)
                                return POOL_END;

                }

                /* send query to other nodes */
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                                continue;
                        if (deadlock_detected == SPECIFIED_ERROR || 
serialization_error_detected == SPECIFIED_ERROR ||
                                active_sql_transaction_error == SPECIFIED_ERROR)
                        {
                                char msg[1024] = "pgpoool_error_portal"; /* 
large enough */
                                int len = strlen(msg);

                                if (deadlock_detected == SPECIFIED_ERROR)
                                        pool_log("Execute: received deadlock 
error message from master node. query: %s", string);
                                else if (serialization_error_detected == 
SPECIFIED_ERROR)
                                        pool_log("SimpleQuery: received 
serialization failure error message from master node. query: %s", string);
                                else
                                        pool_log("SimpleQuery: received SET 
TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s", 
string);

                                memset(msg + len, 0, sizeof(int));
                                if (send_execute_message(backend, i, len + 5, 
msg))
                                        return POOL_END;
                        }
                        else if (send_execute_message(backend, i, len, string) 
!= POOL_CONTINUE)
                                return POOL_END;
                }

                /* wait for nodes excepted for master node */
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                                continue;

                        pool_debug("waiting for backend completing the query");
                        if (synchronize(CONNECTION(backend, i)))
                                return POOL_END;
                }

                if (commit)
                {
                        if (send_execute_message(backend, MASTER_NODE_ID, len, 
string) != POOL_CONTINUE)
                                return POOL_END;

                        pool_debug("waiting for backend completing the query");
                        if (synchronize(MASTER(backend)))
                                return POOL_END;
                }
        }
        else
        {
                if (send_execute_message(backend, MASTER_NODE_ID, len, string) 
!= POOL_CONTINUE)
                        return POOL_END;

                pool_debug("waiting for backend completing the query");
                if (synchronize(CONNECTION(backend, MASTER_NODE_ID)))
                        return POOL_END;
        }

        while ((ret = read_kind_from_backend(frontend, backend, &kind)) == 
POOL_CONTINUE)
        {
                /*
                 * forward message until receiving CommandComplete,
                 * ErrorResponse, EmptyQueryResponse or PortalSuspend.
                 */
                if (kind == 'C' || kind == 'E' || kind == 'I' || kind == 's')
                        break;

                status = SimpleForwardToFrontend(kind, frontend, backend);
                if (status != POOL_CONTINUE)
                        return status;
        }
        if (ret != POOL_CONTINUE)
                return ret;

        status = SimpleForwardToFrontend(kind, frontend, backend);
        if (status != POOL_CONTINUE)
                return status;

        return POOL_CONTINUE;
}

/*
 * process Parse (V3 only)
 */
POOL_STATUS Parse(POOL_CONNECTION *frontend, 
                                                 POOL_CONNECTION_POOL *backend)
{
        char kind;
        int len;
        char *string;
        int i;
        Portal *portal;
        POOL_MEMORY_POOL *old_context;
        PrepareStmt *p_stmt;
        char *name, *stmt;
        List *parse_tree_list;
        Node *node = NULL;
        int deadlock_detected = 0;
        int insert_stmt_with_lock = 0;
        POOL_STATUS status;

        /* read Parse packet */
        if (pool_read(frontend, &len, sizeof(len)) < 0)
                return POOL_END;

        len = ntohl(len) - 4;
        string = pool_read2(frontend, len);

        pool_debug("Parse: portal name <%s>", string);

        name = string;
        stmt = string + strlen(string) + 1;

        parse_tree_list = raw_parser(stmt);
        if (parse_tree_list == NIL)
        {
                free_parser();
        }
        else
        {
                node = (Node *) lfirst(list_head(parse_tree_list));

                insert_stmt_with_lock = need_insert_lock(backend, stmt, node);

                portal = create_portal();
                if (portal == NULL)
                {
                        pool_error("Parse: create_portal() failed");
                        return POOL_END;
                }

                /* switch memory context */
                old_context = pool_memory;
                pool_memory = portal->prepare_ctxt;

                /* translate Parse message to PrepareStmt */
                p_stmt = palloc(sizeof(PrepareStmt));
                p_stmt->type = T_PrepareStmt;
                p_stmt->name = pstrdup(name);
                p_stmt->query = copyObject(node);
                portal->stmt = (Node *)p_stmt;
                portal->portal_name = NULL;
                portal->sql_string = pstrdup(stmt);

                if (*name)
                {
                        pending_function = add_prepared_list;
                        pending_prepared_portal = portal;
                }
                else /* unnamed statement */
                {
                        pending_function = add_unnamed_portal;
                        pfree(p_stmt->name);
                        p_stmt->name = NULL;
                        pending_prepared_portal = portal;
                }

                /* switch old memory context */
                pool_memory = old_context;

                if (REPLICATION)
                {
                        char kind;

                        if (TSTATE(backend) != 'T')
                        {
                                /* synchronize transaction state */
                                for (i = 0; i < NUM_BACKENDS; i++)
                                {
                                        if (!VALID_BACKEND(i))
                                                continue;

                                        /* send sync message */
                                        send_extended_protocol_message(backend, 
i, "S", 0, "");
                                }

                                kind = pool_read_kind(backend);
                                if (kind != 'Z')
                                        return POOL_END;
                                if (ReadyForQuery(frontend, backend, 0) != 
POOL_CONTINUE)
                                        return POOL_END;
                        }

                        if (is_strict_query(node))
                                start_internal_transaction(backend, node);

                        if (insert_stmt_with_lock)
                        {
                                /* start a transaction if needed and lock the 
table */
                                status = insert_lock(backend, stmt, (InsertStmt 
*)node);
                                if (status != POOL_CONTINUE)
                                {
                                        return status;
                                }
                        }
                }
                free_parser();
        }

        /* send to master node */
        if (send_extended_protocol_message(backend, MASTER_NODE_ID,
                                                                           "P", 
len, string))
                return POOL_END;

        if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
        {
                /* We must synchronize because Parse message acquires table
                 * locks.
                 */
                pool_debug("waiting for master completing the query");
                if (synchronize(MASTER(backend)))
                        return POOL_END;

                /*
                 * We must check deadlock error because a aborted transaction
                 * by detecting deadlock isn't same on all nodes.
                 * If a transaction is aborted on master node, pgpool send a
                 * error query to another nodes.
                 */
                deadlock_detected = detect_deadlock_error(MASTER(backend), 
MAJOR(backend));
                if (deadlock_detected < 0)
                        return POOL_END;

                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
                        {
                                if (deadlock_detected)
                                {
                                        pool_log("Parse: received deadlock 
error message from master node");

                                        if 
(send_simplequery_message(CONNECTION(backend, i),
                                                                                
                 strlen(POOL_ERROR_QUERY)+1,
                                                                                
                 POOL_ERROR_QUERY,
                                                                                
                 MAJOR(backend)))
                                                return POOL_END;
                                }
                                else if 
(send_extended_protocol_message(backend, i,
                                                                                
                                "P", len, string))
                                        return POOL_END;
                        }
                }

                /* wait for DB nodes completing query except master node */
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                                continue;

                        pool_debug("waiting for %th backend completing the 
query", i);
                        if (synchronize(CONNECTION(backend, i)))
                                return POOL_END;
                }
        }

        for (;;)
        {
                POOL_STATUS ret;
                ret = read_kind_from_backend(frontend, backend, &kind);

                if (ret != POOL_CONTINUE)
                        return ret;
                        
                SimpleForwardToFrontend(kind, frontend, backend);
                if (pool_flush(frontend) < 0)
                        return POOL_ERROR;

                /* Ignore warning messages */
                if (kind != 'N')
                        break;
        }
        return POOL_CONTINUE;
}

/*
 * Process ReadyForQuery('Z') message.
 *
 * - if the global error status "mismatch_ntuples" is set, send an error query
 *       to all DB nodes to abort transaction.
 * - internal transaction is closed
 */
POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, 
                                                                 
POOL_CONNECTION_POOL *backend, int send_ready)
{
        StartupPacket *sp;
        char psbuf[1024];
        int i;
        int len;
        signed char state;

        /*
         * If the numbers of update tuples are differ, we need to abort 
transaction
         * by using do_error_command. This only works with PROTO_MAJOR_V3.
         */
        if (mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
        {
                int i;
                signed char state;
                char kind;

                /*
                 * XXX: discard rest of ReadyForQuery packet
                 */
                if (pool_read_message_length(backend) < 0)
                        return POOL_END;

                state = pool_read_kind(backend);
                if (state < 0)
                        return POOL_END;

                pool_debug("ReadyForQuery: transaction state: %c", state);

                for (i = 0; i < NUM_BACKENDS; i++)
                {
                        if (VALID_BACKEND(i))
                        {
                                /* abort transaction on all nodes. */
                                do_error_command(CONNECTION(backend, i), 
PROTO_MAJOR_V3);
                        }
                }

                /* loop through until we get ReadyForQuery */
                for(;;)
                {
                        kind = pool_read_kind(backend);
                        if (kind < 0)
                                return POOL_END;

                        if (kind == 'Z')
                                break;

                        /* put the message back to read buffer */
                        for (i=0;i<NUM_BACKENDS;i++)
                        {
                                if (VALID_BACKEND(i))
                                {
                                        pool_unread(CONNECTION(backend,i), 
&kind, 1);
                                }
                        }

                        /* discard rest of the packet */
                        if (pool_discard_packet(backend) != POOL_CONTINUE)
                        {
                                pool_error("ReadyForQuery: pool_discard_packet 
failed");
                                return POOL_END;
                        }
                }
                mismatch_ntuples = 0;
        }

        /* 
         * if a transaction is started for insert lock, we need to close
         * the transaction.
         */
        if (internal_transaction_started && allow_close_transaction)
        {
                int len;
                signed char state;

                if (MAJOR(backend) == PROTO_MAJOR_V3)
                {
                        if ((len = pool_read_message_length(backend)) < 0)
                                return POOL_END;

                        pool_debug("ReadyForQuery: message length: %d", len);

                        len = htonl(len);

                        state = pool_read_kind(backend);
                        if (state < 0)
                                return POOL_END;

                        /* set transaction state */
                        pool_debug("ReadyForQuery: transaction state: %c", 
state);
                }

                if (end_internal_transaction(backend) != POOL_CONTINUE)
                        return POOL_ERROR;
        }

        if (MAJOR(backend) == PROTO_MAJOR_V3)
        {
                if ((len = pool_read_message_length(backend)) < 0)
                        return POOL_END;

                pool_debug("ReadyForQuery: message length: %d", len);

                /*
                 * Do not check transaction state in master/slave mode.
                 * Because SET, PREPARE, DEALLOCATE are replicated.
                 * If these queries are executed inside a transaction block,
                 * transation state will be inconsistent. But it is no problem.
                 */
                if (master_slave_dml)
                {
                        char kind, kind1;

                        if (pool_read(MASTER(backend), &kind, sizeof(kind)))
                                return POOL_END;

                        for (i = 0; i < NUM_BACKENDS; i++)
                        {
                                if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                                        continue;
                                
                                if (pool_read(CONNECTION(backend, i), &kind1, 
sizeof(kind)))
                                        return POOL_END;
                        }
                        state = kind;
                }
                else
                {
                        state = pool_read_kind(backend);
                        if (state < 0)
                                return POOL_END;
                }

                /* set transaction state */
                pool_debug("ReadyForQuery: transaction state: %c", state);

                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (!VALID_BACKEND(i))
                                continue;

                        CONNECTION(backend, i)->tstate = state;
                }
        }

        if (send_ready)
        {
                pool_write(frontend, "Z", 1);

                if (MAJOR(backend) == PROTO_MAJOR_V3)
                {
                        len = htonl(len);
                        pool_write(frontend, &len, sizeof(len));
                        pool_write(frontend, &state, 1);
                }

                if (pool_flush(frontend))
                        return POOL_END;
        }

        in_progress = 0;

        /* end load balance mode */
        if (in_load_balance)
                end_load_balance(backend);

        if (master_slave_dml)
        {
                MASTER_SLAVE = 1;
                master_slave_was_enabled = 0;
                master_slave_dml = 0;
                if (force_replication)
                {
                        force_replication = 0;
                        REPLICATION = 0;
                        replication_was_enabled = 0;
                }
        }

#ifdef NOT_USED
        return ProcessFrontendResponse(frontend, backend);
#endif

        sp = MASTER_CONNECTION(backend)->sp;
        if (MASTER(backend)->tstate == 'T')
                snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction", 
                                 sp->user, sp->database, remote_ps_data);
        else
                snprintf(psbuf, sizeof(psbuf), "%s %s %s idle", 
                                 sp->user, sp->database, remote_ps_data);
        set_ps_display(psbuf, false);

        return POOL_CONTINUE;
}


POOL_STATUS FunctionCall(POOL_CONNECTION *frontend, 
                                                                
POOL_CONNECTION_POOL *backend)
{
        char dummy[2];
        int oid;
        int argn;
        int i;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        pool_write(CONNECTION(backend, i), "F", 1);
                }
        }

        /* dummy */
        if (pool_read(frontend, dummy, sizeof(dummy)) < 0)
                return POOL_ERROR;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        pool_write(CONNECTION(backend, i), dummy, 
sizeof(dummy));
                }
        }

        /* function object id */
        if (pool_read(frontend, &oid, sizeof(oid)) < 0)
                return POOL_ERROR;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
                }
        }

        /* number of arguments */
        if (pool_read(frontend, &argn, sizeof(argn)) < 0)
                return POOL_ERROR;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
                }
        }

        argn = ntohl(argn);

        for (i=0;i<argn;i++)
        {
                int len;
                char *arg;

                /* length of each argument in bytes */
                if (pool_read(frontend, &len, sizeof(len)) < 0)
                        return POOL_ERROR;

                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (VALID_BACKEND(i))
                        {
                                pool_write(CONNECTION(backend, i), &len, 
sizeof(len));
                        }
                }

                len = ntohl(len);

                /* argument value itself */
                if ((arg = pool_read2(frontend, len)) == NULL)
                        return POOL_ERROR;

                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (VALID_BACKEND(i))
                        {
                                pool_write(CONNECTION(backend, i), arg, len);
                        }
                }
        }

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        if (pool_flush(CONNECTION(backend, i)))
                                return POOL_ERROR;
                }
        }
        return POOL_CONTINUE;
}

POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend, 
                                                                                
  POOL_CONNECTION_POOL *backend)
{
        char dummy;
        int len;
        char *result = 0;
        int i;

        pool_write(frontend, "V", 1);

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        if (pool_read(CONNECTION(backend, i), &dummy, 1) < 0)
                                return POOL_ERROR;
                }
        }
        pool_write(frontend, &dummy, 1);

        /* non empty result? */
        if (dummy == 'G')
        {
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (VALID_BACKEND(i))
                        {
                                /* length of result in bytes */
                                if (pool_read(CONNECTION(backend, i), &len, 
sizeof(len)) < 0)
                                        return POOL_ERROR;
                        }
                }
                pool_write(frontend, &len, sizeof(len));

                len = ntohl(len);

                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (VALID_BACKEND(i))
                        {
                                /* result value itself */
                                if ((result = pool_read2(MASTER(backend), len)) 
== NULL)
                                        return POOL_ERROR;
                        }
                }
                pool_write(frontend, result, len);
        }

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        /* unused ('0') */
                        if (pool_read(MASTER(backend), &dummy, 1) < 0)
                                return POOL_ERROR;
                }
        }
        pool_write(frontend, "0", 1);

        return pool_flush(frontend);
}

POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend, 
                                                                                
   POOL_CONNECTION_POOL *backend)
{
        char fkind;
        POOL_STATUS status;
        int i;

        if (frontend->len <= 0 && frontend->no_forward != 0)
                return POOL_CONTINUE;

        if (pool_read(frontend, &fkind, 1) < 0)
        {
                pool_log("ProcessFrontendResponse: failed to read kind from 
frontend. frontend abnormally exited");
                return POOL_END;
        }

        pool_debug("read kind from frontend %c(%02x)", fkind, fkind);

        switch (fkind)
        {

                case 'X':  /* Terminate message*/
                        if (MAJOR(backend) == PROTO_MAJOR_V3)
                        {
                                int len;
                                pool_read(frontend, &len, sizeof(len));
                        }
                        return POOL_END;

                case 'Q':  /* Query message*/
                        in_progress = 1;
                        allow_close_transaction = 1;
                        status = SimpleQuery(frontend, backend, NULL);
                        break;

                case 'E':  /* Execute message */
                        allow_close_transaction = 1;
                        status = Execute(frontend, backend);
                        break;

                case 'P':  /* Parse message */
                        allow_close_transaction = 0;
                        status = Parse(frontend, backend);
                        break;

                case 'S':
                        receive_extended_begin = 0;
                        /* fall through */

                default:
                        if (MAJOR(backend) == PROTO_MAJOR_V3)
                        {
                                if (MASTER_SLAVE &&
                                        (TSTATE(backend) != 'I' || 
receive_extended_begin))
                                {
                                        pool_debug("kind: %c master_slave_dml 
enabled", fkind);
                                        master_slave_was_enabled = 1;
                                        MASTER_SLAVE = 0;
                                        master_slave_dml = 1;
                                }

                                status = SimpleForwardToBackend(fkind, 
frontend, backend);
                                for (i=0;i<NUM_BACKENDS;i++)
                                {
                                        if (VALID_BACKEND(i))
                                        {
                                                if 
(pool_flush(CONNECTION(backend, i)))
                                                        status = POOL_ERROR;
                                        }
                                }
                        }
                        else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 
'F')
                                status = FunctionCall(frontend, backend);
                        else
                        {
                                pool_error("ProcessFrontendResponse: unknown 
message type %c(%02x)", fkind, fkind);
                                status = POOL_ERROR;
                        }
                        break;
        }

        if (status != POOL_CONTINUE)
                status = POOL_ERROR;
        return status;
}

POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend, 
                                                                                
   POOL_CONNECTION_POOL *backend)
{
        int i;
        char *string = NULL;
        char *string1 = NULL;
        int len, len1 = 0;

        /* read command tag */
        string = pool_read_string(MASTER(backend), &len, 0);
        if (string == NULL)
                return POOL_END;
        len1 = len;
        string1 = strdup(string);

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i))
                        continue;

                /* read command tag */
                string = pool_read_string(CONNECTION(backend, i), &len, 0);
                if (string == NULL)
                        return POOL_END;

                if (len != len1)
                {
                        pool_debug("Complete Command Response: message length 
does not match between master(%d \"%s\",) and %d th server (%d \"%s\",)",
                                           len, string, len1, string1);
                        
                        free(string1);
                        return POOL_END;
                }
        }
        /* forward to the frontend */
        pool_write(frontend, "C", 1);
        pool_debug("Complete Command Response: string: \"%s\"", string1);
        if (pool_write(frontend, string1, len1) < 0)
        {
                free(string1);
                return POOL_END;
        }

        free(string1);
        return pool_flush(frontend);
}

int RowDescription(POOL_CONNECTION *frontend, 
                                                  POOL_CONNECTION_POOL *backend,
                                                  short *result)
{
        short num_fields, num_fields1 = 0;
        int oid, mod;
        int oid1, mod1;
        short size, size1;
        char *string;
        int len, len1;
        int i;

        pool_read(MASTER(backend), &num_fields, sizeof(short));
        num_fields1 = num_fields;
        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
                {
                        /* # of fields (could be 0) */
                        pool_read(CONNECTION(backend, i), &num_fields, 
sizeof(short));
                        if (num_fields != num_fields1)
                        {
                                pool_error("RowDescription: num_fields does not 
match between backends master(%d) and %d th backend(%d)",
                                                   num_fields, i, num_fields1);
                                return POOL_FATAL;
                        }
                }
        }

        /* forward it to the frontend */
        pool_write(frontend, "T", 1);
        pool_write(frontend, &num_fields, sizeof(short));
        num_fields = ntohs(num_fields);
        for (i = 0;i<num_fields;i++)
        {
                int j;

                /* field name */
                string = pool_read_string(MASTER(backend), &len, 0);
                if (string == NULL)
                        return POOL_END;
                len1 = len;
                if (pool_write(frontend, string, len) < 0)
                        return POOL_END;

                for (j=0;j<NUM_BACKENDS;j++)
                {
                        if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
                        {
                                string = pool_read_string(CONNECTION(backend, 
j), &len, 0);
                                if (string == NULL)
                                        return POOL_END;

                                if (len != len1)
                                {
                                        pool_error("RowDescription: field 
length does not match between backends master(%d) and %d th backend(%d)",
                                                           ntohl(len), 
ntohl(len1));
                                        return POOL_FATAL;
                                }
                        }
                }

                /* type oid */
                pool_read(MASTER(backend), &oid, sizeof(int));
                oid1 = oid;
                pool_debug("RowDescription: type oid: %d", ntohl(oid));
                for (j=0;j<NUM_BACKENDS;j++)
                {
                        if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
                        {
                                pool_read(CONNECTION(backend, j), &oid, 
sizeof(int));

                                /* we do not regard oid mismatch as fatal */
                                if (oid != oid1)
                                {
                                        pool_debug("RowDescription: field oid 
does not match between backends master(%d) and %d th backend(%d)",
                                                           ntohl(oid), j, 
ntohl(oid1));
                                }
                        }
                }
                if (pool_write(frontend, &oid1, sizeof(int)) < 0)
                        return POOL_END;

                /* size */
                pool_read(MASTER(backend), &size, sizeof(short));
                size1 = size;
                for (j=0;j<NUM_BACKENDS;j++)
                {
                        if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
                        {
                                pool_read(CONNECTION(backend, j), &size, 
sizeof(short));
                                if (size1 != size1)
                                {
                                        pool_error("RowDescription: field size 
does not match between backends master(%d) and %d th backend(%d)",
                                                           ntohs(size), j, 
ntohs(size1));
                                        return POOL_FATAL;
                                }
                        }
                }
                pool_debug("RowDescription: field size: %d", ntohs(size));
                pool_write(frontend, &size1, sizeof(short));

                /* modifier */
                pool_read(MASTER(backend), &mod, sizeof(int));
                pool_debug("RowDescription: modifier: %d", ntohs(mod));
                mod1 = mod;
                for (j=0;j<NUM_BACKENDS;j++)
                {
                        if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
                        {
                                pool_read(CONNECTION(backend, j), &mod, 
sizeof(int));
                                if (mod != mod1)
                                {
                                        pool_debug("RowDescription: modifier 
does not match between backends master(%d) and %d th backend(%d)",
                                                           ntohl(mod), j, 
ntohl(mod1));
                                }
                        }
                }
                if (pool_write(frontend, &mod1, sizeof(int)) < 0)
                        return POOL_END;
        }

        *result = num_fields;

        return pool_flush(frontend);
}

POOL_STATUS AsciiRow(POOL_CONNECTION *frontend, 
                                                        POOL_CONNECTION_POOL 
*backend,
                                                        short num_fields)
{
        static char nullmap[8192], nullmap1[8192];
        int nbytes;
        int i, j;
        unsigned char mask;
        int size, size1 = 0;
        char *buf = NULL, *sendbuf = NULL;
        char msgbuf[1024];

        pool_write(frontend, "D", 1);

        nbytes = (num_fields + 7)/8;

        if (nbytes <= 0)
                return POOL_CONTINUE;

        /* NULL map */
        pool_read(MASTER(backend), nullmap, nbytes);
        memcpy(nullmap1, nullmap, nbytes);
        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
                {
                        pool_read(CONNECTION(backend, i), nullmap, nbytes);
                        if (memcmp(nullmap, nullmap1, nbytes))
                        {
                                /* XXX: NULLMAP maybe different among
                                   backends. If we were a paranoid, we have to 
treat
                                   this as a fatal error. However in the real 
world
                                   we'd better to adapt this situation. Just 
throw a
                                   log... */
                                pool_debug("AsciiRow: NULLMAP differ between 
master and %d th backend", i);
                        }
                }
        }

        if (pool_write(frontend, nullmap1, nbytes) < 0)
                return POOL_END;

        mask = 0;

        for (i = 0;i<num_fields;i++)
        {
                if (mask == 0)
                        mask = 0x80;

                /* NOT NULL? */
                if (mask & nullmap[i/8])
                {
                        /* field size */
                        if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
                                return POOL_END;

                        size1 = ntohl(size) - 4;

                        /* read and send actual data only when size > 0 */
                        if (size1 > 0)
                        {
                                sendbuf = pool_read2(MASTER(backend), size1);
                                if (sendbuf == NULL)
                                        return POOL_END;
                        }

                        /* forward to frontend */
                        pool_write(frontend, &size, sizeof(int));
                        pool_write(frontend, sendbuf, size1);
                        snprintf(msgbuf, Min(sizeof(msgbuf), size1+1), "%s", 
sendbuf);
                        pool_debug("AsciiRow: len: %d data: %s", size1, msgbuf);

                        for (j=0;j<NUM_BACKENDS;j++)
                        {
                                if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
                                {
                                        /* field size */
                                        if (pool_read(CONNECTION(backend, j), 
&size, sizeof(int)) < 0)
                                                return POOL_END;

                                        buf = NULL;
                                        size = ntohl(size) - 4;

                                        /* XXX: field size maybe different among
                                           backends. If we were a paranoid, we 
have to treat
                                           this as a fatal error. However in 
the real world
                                           we'd better to adapt this situation. 
Just throw a
                                           log... */
                                        if (size != size1)
                                                pool_debug("AsciiRow: %d th 
field size does not match between master(%d) and %d th backend(%d)",
                                                                   i, 
ntohl(size), j, ntohl(size1));

                                        /* read and send actual data only when 
size > 0 */
                                        if (size > 0)
                                        {
                                                buf = 
pool_read2(CONNECTION(backend, j), size);
                                                if (buf == NULL)
                                                        return POOL_END;
                                        }
                                }
                        }
                }

                mask >>= 1;
        }

        if (pool_flush(frontend))
                return POOL_END;

        return POOL_CONTINUE;
}

POOL_STATUS BinaryRow(POOL_CONNECTION *frontend, 
                                                         POOL_CONNECTION_POOL 
*backend,
                                                         short num_fields)
{
        static char nullmap[8192], nullmap1[8192];
        int nbytes;
        int i, j;
        unsigned char mask;
        int size, size1 = 0;
        char *buf = NULL;

        pool_write(frontend, "B", 1);

        nbytes = (num_fields + 7)/8;

        if (nbytes <= 0)
                return POOL_CONTINUE;

        /* NULL map */
        pool_read(MASTER(backend), nullmap, nbytes);
        if (pool_write(frontend, nullmap, nbytes) < 0)
                return POOL_END;
        memcpy(nullmap1, nullmap, nbytes);
        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
                {
                        pool_read(CONNECTION(backend, i), nullmap, nbytes);
                        if (memcmp(nullmap, nullmap1, nbytes))
                        {
                                /* XXX: NULLMAP maybe different among
                                   backends. If we were a paranoid, we have to 
treat
                                   this as a fatal error. However in the real 
world
                                   we'd better to adapt this situation. Just 
throw a
                                   log... */
                                pool_debug("BinaryRow: NULLMAP differ between 
master and %d th backend", i);
                        }
                }
        }

        mask = 0;

        for (i = 0;i<num_fields;i++)
        {
                if (mask == 0)
                        mask = 0x80;

                /* NOT NULL? */
                if (mask & nullmap[i/8])
                {
                        /* field size */
                        if (pool_read(MASTER(backend), &size, sizeof(int)) < 0)
                                return POOL_END;                        
                        for (j=0;j<NUM_BACKENDS;j++)
                        {
                                if (VALID_BACKEND(j) && !IS_MASTER_NODE_ID(j))
                                {
                                        /* field size */
                                        if (pool_read(CONNECTION(backend, i), 
&size, sizeof(int)) < 0)
                                                return POOL_END;                
        

                                        /* XXX: field size maybe different among
                                           backends. If we were a paranoid, we 
have to treat
                                           this as a fatal error. However in 
the real world
                                           we'd better to adapt this situation. 
Just throw a
                                           log... */
                                        if (size != size1)
                                                pool_debug("BinaryRow: %d th 
field size does not match between master(%d) and %d th backend(%d)",
                                                                   i, 
ntohl(size), j, ntohl(size1));
                                }

                                buf = NULL;

                                /* forward to frontend */
                                if (IS_MASTER_NODE_ID(j))
                                        pool_write(frontend, &size, 
sizeof(int));
                                size = ntohl(size) - 4;

                                /* read and send actual data only when size > 0 
*/
                                if (size > 0)
                                {
                                        buf = pool_read2(CONNECTION(backend, 
j), size);
                                        if (buf == NULL)
                                                return POOL_END;

                                        if (IS_MASTER_NODE_ID(j))
                                        {
                                                pool_write(frontend, buf, size);
                                        }
                                }
                        }

                        mask >>= 1;
                }
        }

        if (pool_flush(frontend))
                return POOL_END;

        return POOL_CONTINUE;
}

POOL_STATUS CursorResponse(POOL_CONNECTION *frontend, 
                                                                  
POOL_CONNECTION_POOL *backend)
{
        char *string = NULL;
        char *string1 = NULL;
        int len, len1 = 0;
        int i;

        /* read cursor name */
        string = pool_read_string(MASTER(backend), &len, 0);
        if (string == NULL)
                return POOL_END;
        len1 = len;
        string1 = strdup(string);

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
                {
                        /* read cursor name */
                        string = pool_read_string(CONNECTION(backend, i), &len, 
0);
                        if (string == NULL)
                                return POOL_END;
                        if (len != len1)
                        {
                                pool_error("CursorResponse: length does not 
match between master(%d) and %d th backend(%d)",
                                                   len, i, len1);
                                pool_error("CursorResponse: master(%s) %d th 
backend(%s)", string1, string);
                                free(string1);
                                return POOL_END;
                        }
                }
        }

        /* forward to the frontend */
        pool_write(frontend, "P", 1);
        if (pool_write(frontend, string1, len1) < 0)
        {
                free(string1);
                return POOL_END;
        }
        free(string1);

        if (pool_flush(frontend))
                return POOL_END;

        return POOL_CONTINUE;
}

POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend, 
                                                  POOL_CONNECTION_POOL *backend)
{
        char *string = NULL;
        int len;
        int i;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        /* read error message */
                        string = pool_read_string(CONNECTION(backend, i), &len, 
0);
                        if (string == NULL)
                                return POOL_END;
                }
        }

        /* forward to the frontend */
        pool_write(frontend, "E", 1);
        if (pool_write_and_flush(frontend, string, len) < 0)
                return POOL_END;

        /* change transaction state */
        if (TSTATE(backend) == 'T')
                TSTATE(backend) = 'E';
        else
                TSTATE(backend) = 'I';
                        
        return POOL_CONTINUE;
}

POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend, 
                                                                  
POOL_CONNECTION_POOL *backend)
{
        char *string = NULL;
        int len;
        int i;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        /* read notice message */
                        string = pool_read_string(CONNECTION(backend, i), &len, 
0);
                        if (string == NULL)
                                return POOL_END;
                }
        }

        /* forward to the frontend */
        pool_write(frontend, "N", 1);
        if (pool_write_and_flush(frontend, string, len) < 0)
        {
                return POOL_END;
        }
        return POOL_CONTINUE;
}

POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend, 
                                                                  
POOL_CONNECTION_POOL *backend)
{
        POOL_STATUS status;

        /* forward to the frontend */
        if (MAJOR(backend) == PROTO_MAJOR_V3)
        {
                if (SimpleForwardToFrontend('G', frontend, backend) != 
POOL_CONTINUE)
                        return POOL_END;
                if (pool_flush(frontend) != POOL_CONTINUE)
                        return POOL_END;
        }
        else
                if (pool_write_and_flush(frontend, "G", 1) < 0)
                        return POOL_END;

        status = CopyDataRows(frontend, backend, 1);
        return status;
}

POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend, 
                                                                   
POOL_CONNECTION_POOL *backend)
{
        POOL_STATUS status;

        /* forward to the frontend */
        if (MAJOR(backend) == PROTO_MAJOR_V3)
        {
                if (SimpleForwardToFrontend('H', frontend, backend) != 
POOL_CONTINUE)
                        return POOL_END;
                if (pool_flush(frontend) != POOL_CONTINUE)
                        return POOL_END;
        }
        else
                if (pool_write_and_flush(frontend, "H", 1) < 0)
                        return POOL_END;

        status = CopyDataRows(frontend, backend, 0);
        return status;
}

POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
                                                                
POOL_CONNECTION_POOL *backend, int copyin)
{
        char *string = NULL;
        int len;
        int i;
        DistDefInfo *info = NULL;

#ifdef DEBUG
        int j = 0;
        char buf[1024];
#endif

        if (copyin && pool_config->parallel_mode == TRUE)
        {
                info = 
pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database,
                                                                          
copy_schema,
                                                                          
copy_table);
        }

        for (;;)
        {
                if (copyin)
                {
                        if (MAJOR(backend) == PROTO_MAJOR_V3)
                        {
                                char kind;
                                int sendlen;
                                char *p, *p1;

                                if (pool_read(frontend, &kind, 1) < 0)
                                        return POOL_END;

                                if (info && kind == 'd')
                                {
                                        int id;
                                        if (pool_read(frontend, &sendlen, 
sizeof(sendlen)))
                                        {
                                                return POOL_END;
                                        }

                                        len = ntohl(sendlen) - 4;

                                        if (len <= 0)
                                                return POOL_CONTINUE;

                                        p = pool_read2(frontend, len);
                                        if (p == NULL)
                                                return POOL_END;

                                        /* copy end ? */
                                        if (len == 3 && memcmp(p, "\\.\n", 3) 
== 0)
                                        {
                                                for (i=0;i<NUM_BACKENDS;i++)
                                                {
                                                        if (VALID_BACKEND(i))
                                                        {
                                                                if 
(pool_write(CONNECTION(backend, i), &kind, 1))
                                                                        return 
POOL_END;
                                                                if 
(pool_write(CONNECTION(backend, i), &sendlen, sizeof(sendlen)))
                                                                        return 
POOL_END;
                                                                if 
(pool_write(CONNECTION(backend, i), p, len))
                                                                        return 
POOL_END;
                                                        }
                                                }
                                        }
                                        else
                                        {
                                                p1 = parse_copy_data(p, len, 
copy_delimiter, info->dist_key_col_id);

                                                if (!p1)
                                                {
                                                        
pool_error("CopyDataRow: cannot parse data");
                                                        return POOL_END;
                                                }
                                                else if (strcmp(p1, copy_null) 
== 0)
                                                {
                                                        
pool_error("CopyDataRow: key parameter is NULL");
                                                        free(p1);
                                                        return POOL_END;
                                                }

                                                id = pool_get_id(info, p1);
                                                pool_debug("CopyDataRow: 
copying id: %d", id);
                                                free(p1);
                                                if (!VALID_BACKEND(id))
                                                {
                                                        exit(1);
                                                }
                                                if 
(pool_write(CONNECTION(backend, id), &kind, 1))
                                                {
                                                        return POOL_END;
                                                }
                                                if 
(pool_write(CONNECTION(backend, id), &sendlen, sizeof(sendlen)))
                                                {
                                                        return POOL_END;
                                                }
                                                if 
(pool_write_and_flush(CONNECTION(backend, id), p, len))
                                                {
                                                        return POOL_END;
                                                }
                                        }
                                }
                                else
                                {
                                        SimpleForwardToBackend(kind, frontend, 
backend);
                                }

                                /* CopyData? */
                                if (kind == 'd')
                                        continue;
                                else
                                {
                                        pool_debug("CopyDataRows: copyin kind 
other than d (%c)", kind);
                                        break;
                                }
                        }
                        else
                                string = pool_read_string(frontend, &len, 1);
                }
                else
                {
                        /* CopyOut */
                        if (MAJOR(backend) == PROTO_MAJOR_V3)
                        {
                                signed char kind;

                                if ((kind = pool_read_kind(backend)) < 0)
                                        return POOL_END;

                                SimpleForwardToFrontend(kind, frontend, 
backend);

                                /* CopyData? */
                                if (kind == 'd')
                                        continue;
                                else
                                        break;
                        }
                        else
                        {
                                for (i=0;i<NUM_BACKENDS;i++)
                                {
                                        if (VALID_BACKEND(i))
                                        {
                                                string = 
pool_read_string(CONNECTION(backend, i), &len, 1);
                                        }
                                }
                        }
                }

                if (string == NULL)
                        return POOL_END;

#ifdef DEBUG
                strncpy(buf, string, len);
                pool_debug("copy line %d %d bytes :%s:", j++, len, buf);
#endif

                if (copyin)
                {
                        for (i=0;i<NUM_BACKENDS;i++)
                        {
                                if (VALID_BACKEND(i))
                                {
                                        pool_write(CONNECTION(backend, i), 
string, len);
                                }
                        }
                }
                else
                        pool_write(frontend, string, len);              

                if (len == PROTO_MAJOR_V3)
                {
                        /* end of copy? */
                        if (string[0] == '\\' &&
                                string[1] == '.' &&
                                string[2] == '\n')
                        {
                                break;
                        }
                }
        }

        if (copyin)
        {
                for (i=0;i<NUM_BACKENDS;i++)
                {
                        if (VALID_BACKEND(i))
                        {
                                if (pool_flush(CONNECTION(backend, i)) <0)
                                        return POOL_END;

                                if (synchronize(CONNECTION(backend, i)))
                                        return POOL_END;
                        }
                }
        }
        else
                if (pool_flush(frontend) <0)
                        return POOL_END;

        return POOL_CONTINUE;
}

POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend,
                                                                          
POOL_CONNECTION_POOL *backend)
{
        char c;
        int i;

        for (i=0;i<NUM_BACKENDS;i++)
        {
                if (VALID_BACKEND(i))
                {
                        if (pool_read(CONNECTION(backend, i), &c, sizeof(c)) < 
0)
                                return POOL_END;
                }
        }

        pool_write(frontend, "I", 1);
        return pool_write_and_flush(frontend, "", 1);
}
_______________________________________________
Pgpool-general mailing list
Pgpool-general@pgfoundry.org
http://pgfoundry.org/mailman/listinfo/pgpool-general

Reply via email to