Update of /cvsroot/monetdb/clients/src/java/src/nl/cwi/monetdb/xrpc/wrapper
In directory sc8-pr-cvs16:/tmp/cvs-serv15332/src/nl/cwi/monetdb/xrpc/wrapper

Added Files:
        XrpcWrapper.java XrpcWrapperWorker.java wrapper_functions.xq 
Log Message:
Implementation of the XRPC wrapper for external XQuery engines, as
described in our paper submitted to VLDB07:

 The XRPC wrapper is a SOAP service handler that stores the incoming
 SOAP XRPC request messages in a temporary location, generates an XQuery
 query for this request, and executes it on an XQuery processor.
 
 The generated query is crafted to compute the result of a Bulk XRPC by
 calling the requested function on the parameters found in the message,
 and to generate the SOAP response message in XML using element
 construction.

This XRPC wrapper works with Saxon8-9J and Galax 0.7.2 .
To use the warpper:

java -jar <path-to>/xrpcwrapper-0.1.jar \
     --command "<command-to-run-your-xquery-engine>"

The "command" option is mandatory, since the wrapper has to know how to
run the XQuery engine.  

For example, to use Saxon:
java -jar jars/xrpcwrapper-1.0.jar \
     --command "java -cp <path-to>/saxon8.jar net.sf.saxon.Query"

To use Galax:
java -jar jars/xrpcwrapper-1.0.jar \
     --command "<path-to>/galax-run"

The string value of the "command" option is passed literally to a Java
Proc to be executed.  If any options need to be passed to the XQuery
engine, they should be added to the string.

Examples can be found in the Tests sub-directory
(nl/cwi/monetdb/xrpc/Tests).  The module definition file and the XML
documents needs to be copied to "/tmp" manaully.




--- NEW FILE: XrpcWrapper.java ---
/**
 * The contents of this file are subject to the MonetDB Public License
 * Version 1.1 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html

 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and
 * limitations under the License.

 * The Original Code is the MonetDB Database System.

 * The Initial Developer of the Original Code is CWI.
 * Portions created by CWI are Copyright (C) 1997-2007 CWI.
 * All Rights Reserved.
**/

package nl.cwi.monetdb.xrpc.wrapper;

import java.io.*;
import java.net.*;
import nl.cwi.monetdb.util.*;

/**
 * The XRPC wrapper is a SOAP service handler that stores the incoming
 * SOAP XRPC request messages in a temporary location, generates an
 * XQuery query for this request, and executes it on an XQuery
 * processor.
 *
 * The generated query is crafted to compute the result of a Bulk XRPC
 * by calling the requested function on the parameters found in the
 * message, and to generate the SOAP response message in XML using
 * element construction.
 *
 * @author Ying Zhang <[EMAIL PROTECTED]>
 * @version 0.1
 */

public class XrpcWrapper {
    public static final int MIN_BUFSIZE = 128;
    public static final int DEFAULT_BUFSIZE = 8192;

    public static final String XRPC_WRAPPER_VERSION = "0.1";
    public static final String DEFAULT_PORT = "50002";
    public static final String WF_FILE = "wrapper_functions.xq";
    public static final String FILE_SEPARATOR =
        System.getProperty("file.separator");
    public static final String DEFAULT_ROOTDIR =
        System.getProperty("java.io.tmpdir") + FILE_SEPARATOR;

    private static final String COMMAND_GALAX=
        "galax-run -serialize wf";
    private static final String COMMAND_SAXON=
        "java -cp /ufs/zhang/saxon8-9j/saxon8.jar net.sf.saxon.Query";


    XrpcWrapper(){}

    private void run(CmdLineOpts opts)
    {
        ServerSocket server = null;

        try {
            int port = Integer.parseInt(
                    opts.getOption("port").getArgument());
            server = new ServerSocket(port);
            if(!opts.getOption("quiet").isPresent()){
                System.out.println(
                        "# XRPC Wrapper " + XRPC_WRAPPER_VERSION + "\n" +
                        "# Copyright (c) 1993-2007, CWI. All rights 
reserved.\n" +
                        "# Listening on port " + port + "\n" +
                        "# Type Ctrl-C to stop\n");
            }

            String toFile = opts.getOption("rootdir").getArgument()+WF_FILE;
            extractFileFromJar(opts, WF_FILE, toFile);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

        for(;;){
            try{
                Socket clntsock = server.accept();
                XrpcWrapperWorker worker =
                    new XrpcWrapperWorker(clntsock, opts);
                worker.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void extractFileFromJar(CmdLineOpts opts,
                                    String fromFile,
                                    String toFile)
        throws Exception
    {
        String infoHeader = "INFO: XrpcWrapper.extractFileFromJar(): ";

        char[] cbuf = new char[DEFAULT_BUFSIZE];
        int ret = 0;

        InputStream is = getClass().getResourceAsStream(fromFile);
        if(is == null) {
            throw new Exception("File " + fromFile +
                    " does not exist in the JAR file.");
        }

        BufferedReader reader = new BufferedReader
            (new InputStreamReader(is));
        FileWriter writer = new FileWriter(toFile, false);

        ret = reader.read(cbuf, 0, DEFAULT_BUFSIZE);
        while(ret > 0){
            writer.write(cbuf, 0, ret);
            ret = reader.read(cbuf, 0, DEFAULT_BUFSIZE);
        }
        reader.close();
        writer.close();

        /* TODO: remove the extracted file during shut-down.  For
         *       this we need to catch Ctrl-C. */
        if(opts.getOption("debug").isPresent()){
            System.out.println(infoHeader + fromFile + " extracted to " + 
toFile);
        }
    }

    public static void main (String[] args)
        throws Exception
    {
        String errHeader = "ERROR: XrpcWrapper.main(): ";

        CmdLineOpts copts = new CmdLineOpts();
        /* arguments which take exactly one argument */
        copts.addOption("x", "command", CmdLineOpts.CAR_ONE, null,
                "How to executed the XQuery engine. " +
                "Specify command + options in *one* string");
        copts.addOption("d", "dump", CmdLineOpts.CAR_ONE, "no",
                "Dump the XRPC request/response message.");
        copts.addOption("e", "engine", CmdLineOpts.CAR_ONE, null,
                "Specify which XQuery engine to use.");
        copts.addOption("k", "keep", CmdLineOpts.CAR_ONE, null,
                "Do not remove the temporary files that contain the " +
                "XRPC request message and/or the generated XQuery " +
                "query after a request has been handled.");
        copts.addOption("p", "port", CmdLineOpts.CAR_ONE, DEFAULT_PORT,
                "The port number to listen to.");
        copts.addOption("r", "rootdir", CmdLineOpts.CAR_ONE,
                DEFAULT_ROOTDIR,
                "The root directory to store temporary files.");
        /* arguments which have no argument(s) */
        copts.addOption("D", "debug", CmdLineOpts.CAR_ZERO, null,
                "Turn on DEBUG mode.");
        copts.addOption("h", "help", CmdLineOpts.CAR_ZERO, null,
                "This help screen.");
        copts.addOption("q", "quiet", CmdLineOpts.CAR_ZERO, null,
                "Suppress printing the welcome header.");
        copts.addOption("t", "timing", CmdLineOpts.CAR_ZERO, null,
                "Display time measurements.");
        copts.addOption("v", "version", CmdLineOpts.CAR_ZERO, null,
                "Display version number and exit.");

        /* process the command line arguments */
        copts.processArgs(args);
        if (copts.getOption("help").isPresent()) {
            System.out.print(
                    "Usage java XrpcWrapper\n" +
                    "           [-x command_STRING] " + 
                    "[-d request|response] [-k request|query|both] " +
                    "[-p port] [-r rootdir]\n" +
                    "           [-D] [-h] [-q] [-t] [-v]\n" +
                    "or using long option equivalents:\n" +
                    "--command --dump --keep --port --rootdir\n" +
                    "--debug --help --quiet --timing " +
                    "--version.\n" +
                    "\n" +
                    "The option -c (--command) is obligatory.  " +
                    "This option specifies the command, " +
                    "together with the options, " +
                    "with which the XQuery engine can be executed " +
                    "with the XQuery query being stored in a file.\n" +
                    "If no port is given, 50002 is assumed.\n" +
                    "\n" +
                    "OPTIONS\n" + copts.produceHelpMessage() );
            System.exit(0);
        } else if (copts.getOption("version").isPresent()) {
            System.out.println("XRPC Wrapper version " + XRPC_WRAPPER_VERSION);
            System.exit(0);
        } else if (copts.getOption("engine").isPresent()){
            CmdLineOpts.OptionContainer engineOpt =
                copts.getOption("engine");
            CmdLineOpts.OptionContainer commandOpt =
                copts.getOption("command");

            String engine = engineOpt.getArgument().toLowerCase();
            if(engine.equals("saxon")){
                commandOpt.addArgument(COMMAND_SAXON);
            } else if (engine.equals("galax")){
                commandOpt.addArgument(COMMAND_GALAX);
            } else {
                System.err.println(errHeader + "unknown engine: " +
                    engine);
            }
            commandOpt.setPresent();
        } else if (!copts.getOption("command").isPresent()) {
            System.err.println(
                    errHeader + "missing mandatory option: --command" +
                    errHeader + "don't know how to execute the XQuery engine." +
                    errHeader + "Use --help to get more information.");
            System.exit(-1);
        } else if (copts.getOption("rootdir").isPresent()){
            CmdLineOpts.OptionContainer rootdirOpt =
                copts.getOption("rootdir");
            String rootdir = rootdirOpt.getArgument();
            if(!rootdir.endsWith(FILE_SEPARATOR)) {
                rootdir += FILE_SEPARATOR;
                rootdirOpt.resetArguments();
                rootdirOpt.addArgument(rootdir);
            }
        }

        XrpcWrapper wrapper = new XrpcWrapper();
        wrapper.run(copts);
    }
}

--- NEW FILE: XrpcWrapperWorker.java ---
/**
 * The contents of this file are subject to the MonetDB Public License
 * Version 1.1 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html

 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and
 * limitations under the License.

 * The Original Code is the MonetDB Database System.

 * The Initial Developer of the Original Code is CWI.
 * Portions created by CWI are Copyright (C) 1997-2007 CWI.
 * All Rights Reserved.
**/

package nl.cwi.monetdb.xrpc.wrapper;

import java.io.*;
import java.net.*;
import nl.cwi.monetdb.util.*;

public class XrpcWrapperWorker extends Thread{
    public static final String XRPCD_CALLBACK = "/xrpc";
    public static final String SOAP_NS =
        "http://www.w3.org/2003/05/soap-envelope";;
    public static final String XDT_NS =
        "http://www.w3.org/2005/xpath-datatypes";;
    public static final String XS_NS =
        "http://www.w3.org/2001/XMLSchema";;
    public static final String XSI_NS =
        "http://www.w3.org/2001/XMLSchema-instance";;
    public static final String XRPC_NS =
        "http://monetdb.cwi.nl/XQuery";;
    public static final String XRPC_LOC =
        "http://monetdb.cwi.nl/XQuery/XRPC.xsd";;
    public static final String SOAP_BODY_START =
        "<env:Envelope xsi:schemaLocation=\"" + XRPC_NS + " " +
                                                XRPC_LOC + "\"> \n" +
          "<env:Body> \n";
    public static final String SOAP_BODY_END =
          "</env:Body>\n" +
        "</env:Envelope>\n";
    public static final String SOAP_FAULT_START =
        "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
        "<env:Envelope xmlns:env=\""+SOAP_NS+"\">\n" +
          "<env:Body>\n" +
            "<env:Fault>\n";
    public static final String SOAP_FAULT_END =
            "</env:Fault>\n" +
          "</env:Body>\n" +
        "</env:Envelope>";
    public static final String HTTP_OK_HEADER =
        "HTTP/1.1 200 OK\r\n" +
        "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n";
    public static final String HTTP_ERR_400_HEADER =
        "HTTP/1.1 400 Bad Request\r\n" +
        "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n";
    public static final String HTTP_ERR_500_HEADER =
        "HTTP/1.1 500 Internal Server Error\r\n" +
        "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n";

    class SenderException extends Exception {
        public SenderException(String reason) {
            super(reason);
        }
    }

    class ReceiverException extends Exception {
        public ReceiverException(String reason) {
            super(reason);
        }
    }

    private Socket sock;
    private CmdLineOpts opts;
    private long tid;
    private String rootdir;
    private int contentLength;
    private boolean debug;

    XrpcWrapperWorker(Socket s, CmdLineOpts o)
        throws Exception
    {
        sock = s;
        opts = o;
        tid = this.getId();
        rootdir = o.getOption("rootdir").getArgument();
        debug = o.getOption("debug").isPresent();
    }

    private void DEBUG(String msg)
    {
        if(debug) System.out.print(msg);
    }

    private void sendError(Writer sockOut,
                           String httpErrorHeader,
                           String faultCode,
                           InputStream errorStream,
                           String errMsg)
    {
        String errHeader = "ERROR: (TID "+tid+") sendError(): ";

        try{
            DEBUG(errHeader + "SOAP Fault message to send:\n" +
                    httpErrorHeader + SOAP_FAULT_START +
                    "<env:Code>\n" +
                      "<env:Value>" + faultCode + "</env:Value>\n" +
                    "</env:Code>\n" +
                    "<env:Reason>\n" +
                      "<env:Text xml:lang=\"en\">\n");

            sockOut.write(httpErrorHeader + SOAP_FAULT_START +
                    "<env:Code>\n" +
                      "<env:Value>" + faultCode + "</env:Value>\n" +
                    "</env:Code>\n" +
                    "<env:Reason>\n" +
                      "<env:Text xml:lang=\"en\">");

            if(errMsg != null) {
                sockOut.write(errMsg);
                DEBUG(errMsg);
            } else {
                if(errorStream == null) {
                    throw new NullPointerException(
                            "Either an InputStream or a String must " +
                            "be specified.");
                }

                BufferedReader errIn = new BufferedReader(new
                        InputStreamReader(errorStream));
                int c;
                while((c = errIn.read()) >= 0) {
                    sockOut.write(c);
                    DEBUG(new Character((char)c).toString());
                }
                errIn.close();
            }
            sockOut.write("\n</env:Text>\n</env:Reason>\n" +
                    SOAP_FAULT_END);
            DEBUG("\n</env:Text>\n</env:Reason>\n" + SOAP_FAULT_END);
        } catch (Exception e) {
            System.err.println(errHeader + "caught exception.\n");
            e.printStackTrace();
        }
    }

    /**
     * Read the HTTP header of a request and validate the header.
     *
     **/
    private void handleHttpReqHeader(BufferedReader sockIn)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") handleHttpReqHeader(): ";
        String errHeader = "ERROR: (TID "+tid+") handleHttpReqHeader(): ";

        boolean foundPostHeader = false, foundClHeader = false;

        DEBUG(infoHeader + "HTTP header of XRPC request:\n");
        String ln = sockIn.readLine();
        /* TODO: should check 'HOST' as well! */
        while(ln.length() > 0){
            DEBUG(infoHeader + ln + "\n");
            if(ln.startsWith("POST")){
                if(!ln.startsWith(XRPCD_CALLBACK, 5)){
                    throw new SenderException(
                            "Unsupported Request: \"" + ln + "\"");
                }
                foundPostHeader = true;
            } else if(ln.startsWith("Content-Length:")) {
                try{
                    contentLength = Integer.parseInt(ln.substring(16));
                } catch(NumberFormatException e) {
                    throw new SenderException("Invalid value of " +
                            "\"Content-Length\": \"" +
                            ln.substring(16) + "\": " + e.getMessage());
                }
                foundClHeader = true;
            }
            ln = sockIn.readLine();
        }

        if(!foundPostHeader){
            throw new SenderException("HTTP header does not contain " +
                    "a \"POST\" method definition.");
        } else if (!foundClHeader){
            throw new SenderException("HTTP header does not contain " +
                    "the mandatory \"Content-Length\" field.");
        }
    }

    /**
     * Read an XRPC request and store it on disk under the given
     * filename.  This function also stores the request header (from the
     * "Envelope" tag until the "request" tag) in a StringBuffer so that
     * the attribute values can be retrieved.
     *
     * Returns: a StringBuffer if no error occurred;
     *          throws a new Exception otherwise.
     **/
    private StringBuffer storeXrpcRequest(BufferedReader sockIn,
                                          String filename)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") getXrpcRequest(): ";
        String errHeader = "ERROR: (TID "+tid+") getXrpcRequest(): ";

        StringBuffer buf = new
            StringBuffer(XrpcWrapper.DEFAULT_BUFSIZE);
        BufferedWriter fileOut = new
            BufferedWriter(new FileWriter(filename, false));
        int len = 0, index = -1;

        DEBUG(infoHeader + "reading XRPC request...\n");

        while (buf.length() < contentLength && index < 0 ) {
            for(int i = 0 ; i < 10; i++)
                buf.append((char)(sockIn.read()));
            index = buf.indexOf("request");
        }

        /* find the closing symbol of the "request" tag */
        index = buf.indexOf(">", index + 7);
        while (buf.length() < contentLength && 
               buf.charAt(buf.length()-1) != '>'){
            buf.append((char)(sockIn.read()));
        }
        fileOut.write(buf.toString());

        len = buf.length();
        while (len < contentLength){
            fileOut.write(sockIn.read());
            len++;
        }
        fileOut.close();

        if (len != contentLength) {
            new File(filename).delete();
            throw new SenderException("bytes received: " + len +
                                      "should be: " + contentLength);
        }

        DEBUG(infoHeader + "request (" + len + " bytes) stored in: "
                + filename + "\n");

        /* TODO: remove the temporary file 'filename' if necessary */
        return buf;
    }

    /**
     * Retrieve the value of given 'attribute' from the given
     * 'request'.
     *
     * Returns: a new string containing the value of the attribute;
     *          throws Exception if error occurred.
     **/
    private String getAttributeValue(StringBuffer request,
                                     String attribute)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") getAttributeValue(): ";
        String errHeader = "ERROR: (TID "+tid+") getAttributeValue(): ";

        int start, end;

        start = request.indexOf(attribute);
        if(start < 0){
            throw new SenderException("invalid XRPC request: could " +
                    "not find the attribute \"" + attribute + "\".");
        }
        start += attribute.length();
        start = request.indexOf("\"", start);
        if(start < 0){
            throw new SenderException("invalid XRPC request: " +
                    "attribute \"" + attribute +
                    "\" does not have a string value.");
        }
        end = request.indexOf("\"", (++start));
        if(end < 0){
            throw new SenderException("invalid XRPC request: " +
                    "attribute \"" + attribute +
                    "\" does not have a string value.");
        }

        DEBUG(infoHeader + attribute + " = " +
              request.substring(start, end) + "\n");
        return request.substring(start, end);
    }

    /**
     * Find the prefix of the XRPC namespace
     *
     * Returns: the prefix if succeeded,
     *          throws new Exception otherwise.
     **/
    private String getXrpcPrefix(StringBuffer request)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") getXrpcPrefix(): ";
        String errHeader = "ERROR: (TID "+tid+") getXrpcPrefix(): ";
        int nsLen = XRPC_NS.length();

        int start = request.indexOf("Envelope");
        if(start < 0){
            throw new SenderException(
                    "XRPC request message not well-formed:" +
                    "could not find the \"Envelope\" tag.");
        }

        int end = request.indexOf(">", (start += 8));
        if(end < 0){
            throw new SenderException(
                    "XRPC request message nog well-formed: " +
                    "could not find the end of the \"Envelope\" tag.");
        }

        /* Cut off the name space declarations in the "Envelope" tag and
         * remove all white space characters. */
        String str = request.substring(start, end).replaceAll("[ \t\r\n]", "");
        start = 0;
        end = str.length();
        do{
            start = str.indexOf(XRPC_NS, start);
            if(str.charAt(start + nsLen) == '"'){
                end = str.lastIndexOf("=", start);
                start = str.lastIndexOf("\"", end) + 1;
                if(str.indexOf("xmlns",start) != start){
                    throw new SenderException(
                            "XRPC request message nog well-formed: " +
                            "\"xmlns\" expected in a namespace declaration.");
                }
                DEBUG(infoHeader + "found XRPC namespace identifier: " +
                        str.substring(start + 6, end) + "\n");
                return str.substring(start + 6, end);
            }
            start += nsLen;
        } while(start < end);

        throw new SenderException(
                "XRPC request message nog well-formed: " +
                "declaration of the XRPC namespace \"" + XRPC_NS +
                "\" not found.");
    }

    /**
     * Generate an XQuery query for the request and write the query
     * to a file.
     **/
    private void generateQuery(String requestFilename,
                              String queryFilename,
                              StringBuffer requestHeader)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") generateQuery(): ";
        String errHeader = "ERROR: (TID "+tid+") generateQuery(): ";

        String xrpcPrefix = getXrpcPrefix(requestHeader);
        String xqModule = getAttributeValue(requestHeader, 
xrpcPrefix+":module");
        String xqMethod = getAttributeValue(requestHeader, 
xrpcPrefix+":method");
        String xqLocation = getAttributeValue(requestHeader, 
xrpcPrefix+":location");
        String arityStr = getAttributeValue(requestHeader, "xrpc:arity");
        long arity = Long.parseLong(arityStr);

        try{ 
            FileWriter fileOut = new FileWriter(queryFilename, false);

            fileOut.write(
                    "import module namespace modfunc = \"" + xqModule + "\"\n" +
                    "            at \"" + xqLocation + "\";\n\n" +
                    "import module namespace wf = \"xrpcwrapper\" \n" +
                    "       at \"" + rootdir + XrpcWrapper.WF_FILE + "\";\n\n" +
                    "declare namespace env=\"" + SOAP_NS + "\";\n" +
                    "declare namespace xs=\"" + XS_NS + "\";\n\n" +
                    "declare namespace xrpc=\"" + XRPC_NS + "\";\n\n" +
                    SOAP_BODY_START + 
                    "<xrpc:response xrpc:module=\"" + xqModule + "\" " +
                    "xrpc:method=\"" + xqMethod + "\">{\n" +
                    "  for $call in doc(\"" + requestFilename + "\")" +
                    "//" + xrpcPrefix + ":call\n");
            for(int i = 1; i <= arity; i++){
                /* XQuery index of item sequences starts from 1 */
                fileOut.write("  let $param" + i + " := " +
                        "wf:n2s(\"" + xrpcPrefix + "\", " +
                                "$call/" + xrpcPrefix + ":sequence["+i+"]" +
                        ")\n");
            }
            fileOut.write("  return wf:s2n(modfunc:"+xqMethod+"(");
            for(int i = 1; i <= (arity - 1); i++){
                fileOut.write("$param" + i + ", ");
            }
            if(arity > 0){
                fileOut.write("$param" + arity);
            }
            fileOut.write("))\n}</xrpc:response>\n" + SOAP_BODY_END);
            fileOut.close();
        } catch (Exception e){
            throw new ReceiverException(
                    "failed to generate query for the request.");
        }

        DEBUG(infoHeader + "query generated in "+queryFilename+"\n");
    }

    /**
     * Call the XQuery engine to execute the query and send the
     * results back
     **/
    private void execAndSend(String queryFilename,
                             BufferedWriter sockOut)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") execAndSend(): ";
        String errHeader = "ERROR: (TID "+tid+") execAndSend(): ";
        int ret = -1, len = -1;
        char[] cbuf = new char[XrpcWrapper.MIN_BUFSIZE];
        String command = null;
        Process proc = null;

        try{
            command = opts.getOption("command").getArgument() + " " +
                queryFilename;
            DEBUG(infoHeader + "executing: " + command + "\n");
            proc = Runtime.getRuntime().exec(command);
        } catch (OptionsException oe) {
            throw new ReceiverException("This should not happen: " +
                    "XRPC wrapper does not know how to execute the " +
                    "XQuery engine: " + oe.getMessage());
        } catch (Exception e){
            throw new ReceiverException("Error occurred during " +
                    "execution of command \"" + command + "\":" +
                    e.getMessage());
        }

        BufferedReader procIn = new BufferedReader(new
                InputStreamReader(proc.getInputStream()));
        try{
            len = procIn.read(cbuf, 0, XrpcWrapper.MIN_BUFSIZE);
        } catch (Exception e){
            /* Don't throw exception, try to read proc's ErrorStream */
            len = -1;
        }

        if (len >= 0){
            DEBUG(infoHeader + "query execution seems succeeded: " +
                    "got output from the process' InputStream.\n");
            DEBUG(infoHeader + "sending XRPC response message:\n");

            sockOut.write(HTTP_OK_HEADER);
            sockOut.write(new String(cbuf, 0, len));

            DEBUG(HTTP_OK_HEADER);
            DEBUG(new String(cbuf, 0, len));

            int c;
            while ( (c = procIn.read()) >= 0) {
                sockOut.write(c);
                DEBUG(new Character((char)c).toString());
            }
        } else { /* send SOAP Fault message */
            sendError(sockOut, HTTP_ERR_500_HEADER, "env:Receiver",
                    proc.getErrorStream(), null);
        }

        ret = proc.waitFor();
        DEBUG(infoHeader + "query execution exits with: " + ret + "\n");
    }

    /**
     * Main function to handle an XRPC request:
     *  1. read the request and write it to a file
     *  2. generate an XQuery query for the request and write the
     *     query to a file
     *  3. Call the XQuery engine to execute the query and send the
     *     results back
     **/
    private void handleXrpcReq(BufferedReader sockIn, BufferedWriter sockOut)
        throws Exception
    {
        String infoHeader = "INFO: (TID "+tid+") handleXrpcReq(): ";
        String warnHeader = "WARNING: (TID "+tid+") handleXrpcReq(): ";

        String requestFilename = rootdir+"xrpc_request_"+tid+".xml";
        String queryFilename = rootdir+"xrpc_wrapper_query_"+tid+".xq";

        StringBuffer requestHeader = storeXrpcRequest(sockIn, requestFilename);
        generateQuery(requestFilename, queryFilename, requestHeader);
        execAndSend(queryFilename, sockOut);
        DEBUG(infoHeader + "DONE: " + sock + "\n\n");

        try {
            File fQ = new File(queryFilename);
            File fR = new File(requestFilename);

            if(!opts.getOption("keep").isPresent()){
                fQ.delete();
                fR.delete();
            } else {
                String arg = opts.getOption("keep").getArgument();
                if(arg.equals("request")) {
                    fQ.delete();
                } else if(arg.equals("query")) {
                    fR.delete();
                } 
            }
        } catch (Exception e) {
            System.out.println(warnHeader +
                    "failed to delete temporary file(s):");
            e.printStackTrace();
        }
    }

    public void run()
    {
        String warnHeader = "WARNING: (TID "+tid+") run(): ";
        String errHeader = "ERROR: (TID "+tid+") run(): ";

        BufferedReader sockIn = null;
        BufferedWriter sockOut = null;

        try{
            sockIn = new BufferedReader(new
                    InputStreamReader(sock.getInputStream()));
            sockOut = new BufferedWriter(new
                    OutputStreamWriter(sock.getOutputStream()));

            handleHttpReqHeader(sockIn);
            handleXrpcReq(sockIn, sockOut);
        } catch (SenderException se) {
            sendError(sockOut, HTTP_ERR_400_HEADER, "env:Sender",
                        null, se.getMessage());
        } catch (ReceiverException re) {
            sendError(sockOut, HTTP_ERR_500_HEADER, "env:Receiver",
                        null, re.getMessage());
        } catch (Exception e1){
            System.err.println(errHeader + "caught exception:");
            e1.printStackTrace();
        } finally {
            try{
                sockOut.close();
                sockIn.close();
                sock.close();
            } catch (Exception e2) {
                System.err.println(warnHeader +
                        "caught exception in FINAL block:");
                e2.printStackTrace();
            }
        }
    }
}

--- NEW FILE: wrapper_functions.xq ---
module namespace wrapper = "xrpcwrapper";

declare namespace xrpcm = "http://monetdb.cwi.nl/XQuery";;

declare function wrapper:add-type($val as item()) as node()
{
      typeswitch ($val)
        case xs:boolean return
          <xrpcm:atomic-value xsi:type="xs:boolean">
            {$val}
          </xrpcm:atomic-value>
        case xs:integer return
          <xrpcm:atomic-value xsi:type="xs:integer">
            {$val}
          </xrpcm:atomic-value>
        case xs:decimal return
          <xrpcm:atomic-value xsi:type="xs:decimal">
            {$val}
          </xrpcm:atomic-value>
        case xs:float return
        (: pathfinder does not support xs:float yet, so replace it
         : with xs:double :)
          <xrpcm:atomic-value xsi:type="xs:double">
            {$val}
          </xrpcm:atomic-value>
        case xs:double return
          <xrpcm:atomic-value xsi:type="xs:double">
            {$val}
          </xrpcm:atomic-value>
        case xs:anyURI return
          <xrpcm:atomic-value xsi:type="xs:anyURI">
            {$val}
          </xrpcm:atomic-value>
        case xs:string return
          <xrpcm:atomic-value xsi:type="xs:string">
            {$val}
          </xrpcm:atomic-value>
        case xs:untypedAtomic return
          <xrpcm:atomic-value xsi:type="xs:untypedAtomic">
            {$val}
          </xrpcm:atomic-value>
        case xs:anyAtomicType return
          <xrpcm:atomic-value xsi:type="xs:anyAtomicType">
            {$val}
          </xrpcm:atomic-value>
        case attribute() return
          <xrpcm:attribute>{$val}</xrpcm:attribute>
        case comment() return
          <xrpcm:comment>{$val}</xrpcm:comment>
        case document-node() return
          <xrpcm:document>{$val}</xrpcm:document>
        case element() return
          <xrpcm:element>{$val}</xrpcm:element>
        case processing-instruction() return
          <xrpcm:processing-instruction>
            {$val}
          </xrpcm:processing-instruction>
        case text() return
          <xrpcm:text>{$val}</xrpcm:text>
        case node() return
          <xrpcm:node>{$val}</xrpcm:node>
        default return
          <xrpcm:item>{$val}</xrpcm:item>
};

declare function wrapper:s2n($seq as item()*) as node()
{
  if(empty($seq)) then <xrpcm:sequence/>
  else
    <xrpcm:sequence> {
      for $val in $seq 
      return wrapper:add-type($val)
    } </xrpcm:sequence>
};

declare function wrapper:udf-s2n($seq as item()*,
                                 $qn as xs:string) as node()
{
    <xrpcm:sequence> {
      for $val in $seq
      return <xrpcm:element xsi:type="{$qn}">{$val}</xrpcm:element>
    } </xrpcm:sequence>
};

declare function wrapper:n2s($nsid as xs:string, $nodes as node()*) as item()*
{
    for $typenode in $nodes/child::*
    return
      if($typenode/name() = concat($nsid, ":atomic-value")) then
        if (string($typenode/@xsi:type) = "xs:boolean") then
            $typenode cast as xs:boolean
        else if (string($typenode/@xsi:type) = "xs:integer") then
            $typenode cast as xs:integer
        else if (string($typenode/@xsi:type) = "xs:decimal") then
            $typenode cast as xs:decimal
        else if (string($typenode/@xsi:type) = "xs:double") then
            $typenode cast as xs:double
        else if (string($typenode/@xsi:type) = "xs:string") then
            $typenode/text()
        else if (string($typenode/@xsi:type) = "xs:anySimpleType") then
            $typenode/text()
        else 
            $typenode/text()
      else if ($typenode/name() = concat($nsid, ":attribute")) then
        let $attr-name := $typenode/attribute::*[1]/name()
        return attribute {$attr-name} { $typenode/attribute::* }
      else if ($typenode/name() = concat($nsid, ":comment")) then
        comment {$typenode/comment()}
      else if ($typenode/name() = concat($nsid, ":document")) then
        document {$typenode/child::*}
      else if ($typenode/name() = concat($nsid, ":element") or
               $typenode/name() = concat($nsid, ":node")    or
               $typenode/name() = concat($nsid, ":anyNode") or
               $typenode/name() = concat($nsid, ":item")) then
        element {$typenode/child::*[1]/name()}
                {$typenode/child::*[1]/child::*}
      else if ($typenode/name() =
                concat($nsid, ":processing-instruction")) then
        let $pi := $typenode/processing-instruction()
        return processing-instruction { $pi/name() } {$pi}
      else if ($typenode/name() = concat($nsid, ":text")) then
        text {$typenode/text()}
      else ()
};


-------------------------------------------------------------------------
This SF.net email is sponsored by DB2 Express
Download DB2 Express C - the FREE version of DB2 express and take
control of your XML. No limits. Just data. Click to get it now.
http://sourceforge.net/powerbar/db2/
_______________________________________________
Monetdb-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-checkins

Reply via email to