Update of /cvsroot/monetdb/java/src/nl/cwi/monetdb/xquery/xrpc/wrapper
In directory
sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv10861/src/nl/cwi/monetdb/xquery/xrpc/wrapper
Added Files:
XRPCWrapper.java XRPCWrapperWorker.java wrapper_functions.xq
Log Message:
Moved the XRPC Wrapper to the new "java" module
--- NEW FILE: XRPCWrapper.java ---
/**
* The contents of this file are subject to the MonetDB Public License
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and
* limitations under the License.
* The Original Code is the MonetDB Database System.
* The Initial Developer of the Original Code is CWI.
* Portions created by CWI are Copyright (C) 1997-2007 CWI.
* All Rights Reserved.
**/
package nl.cwi.monetdb.xquery.xrpc.wrapper;
import java.io.*;
import java.net.*;
import nl.cwi.monetdb.util.*;
import nl.cwi.monetdb.xquery.xrpc.api.*;
/**
* The XRPC wrapper is a SOAP service handler that stores the incoming
* SOAP XRPC request messages in a temporary location, generates an
* XQuery query for this request, and executes it on an XQuery
* processor.
*
* The generated query is crafted to compute the result of a Bulk XRPC
* by calling the requested function on the parameters found in the
* message, and to generate the SOAP response message in XML using
* element construction.
*
* @author Ying Zhang <[EMAIL PROTECTED]>
* @version 0.1
*/
public class XRPCWrapper {
public static final int MIN_BUFSIZE = 128;
public static final String FILE_SEPARATOR =
System.getProperty("file.separator");
public static final String DEFAULT_ROOTDIR =
System.getProperty("java.io.tmpdir") + FILE_SEPARATOR;
public static final String XRPC_WRAPPER_VERSION = "0.1";
public static final String XRPCD_CALLBACK = "/xrpc";
public static final String DEFAULT_PORT = "50002";
public static final String PACKAGE_PATH =
"/nl/cwi/monetdb/xquery/xrpc/wrapper";
public static final String WF_FILE = "wrapper_functions.xq";
public static final String WELCOME_MSG =
"# XRPC Wrapper v" + XRPC_WRAPPER_VERSION + "\n" +
"# Copyright (c) 1993-2007, CWI. All rights reserved.\n\n";
CmdLineOpts opts;
XRPCWrapper(CmdLineOpts o)
{
opts = o;
}
private void run()
{
ServerSocket server = null;
try {
int port = Integer.parseInt(
opts.getOption("port").getArgument());
server = new ServerSocket(port);
if(!opts.getOption("quiet").isPresent()){
System.out.println(
"# XRPC Wrapper v" +
XRPC_WRAPPER_VERSION + "\n" +
"# Copyright (c) 1993-2007,
CWI. All rights reserved.\n" +
"# Listening on port " + port +
"\n" +
"# Type Ctrl-C to stop\n");
}
String toFile =
opts.getOption("rootdir").getArgument()+WF_FILE;
/* Extract the XQuery module file to a temporary directory. */
Extract.extractFile(PACKAGE_PATH + "/" + WF_FILE,
toFile);
if(opts.getOption("debug").isPresent()) {
System.out.println("# XQuery module file \"" + WF_FILE +
"\" extracted to \"" + toFile + "\"");
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
for(;;){ /* Run server for ever, until someone kills it */
try{
Socket clntsock = server.accept();
XRPCWrapperWorker worker =
new XRPCWrapperWorker(clntsock, opts);
worker.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static CmdLineOpts initCmdLineOpts()
{
CmdLineOpts copts = new CmdLineOpts();
try{
/* arguments which take exactly one argument */
copts.addOption("c", "command", CmdLineOpts.CAR_ONE, null,
"This option is MANDATORY! This option specifies " +
"the command for executing the XQuery engine and all " +
"options that should be passed to the XQuery engine. " +
"The command and all options MUST be specified in " +
"ONE string.");
/* For example:
* --command "galax-run -serialize wf"
* --command "java -cp <pathto>/saxon8.jar net.sf.saxon.Query"
*/
copts.addOption("p", "port", CmdLineOpts.CAR_ONE, DEFAULT_PORT,
"The port number to listen to\n" +
"(dflt: " + DEFAULT_PORT + ")");
copts.addOption("r", "rootdir", CmdLineOpts.CAR_ONE,
DEFAULT_ROOTDIR,
"The root directory to store temporary files\n" +
"(dflt: " + DEFAULT_ROOTDIR + ").");
copts.addOption("R", "remove", CmdLineOpts.CAR_ONE, null,
"Remove the temporary files (<request|query|all>) " +
"that contain the XRPC request message " +
"(--remove request) and/or the generated XQuery " +
"query (--remove query) after a request has been " +
"handled.");
/* arguments which have no argument(s) */
copts.addOption("d", "debug", CmdLineOpts.CAR_ZERO, null,
"Turn on DEBUG mode to get more information printed.");
copts.addOption("h", "help", CmdLineOpts.CAR_ZERO, null,
"Print this help message.");
copts.addOption("q", "quiet", CmdLineOpts.CAR_ZERO, null,
"Suppress printing the welcome header.");
copts.addOption("v", "version", CmdLineOpts.CAR_ZERO, null,
"Print version number and exit.");
} catch (OptionsException oe) {
System.err.println(WELCOME_MSG);
System.err.println("Internal error: " + oe.getMessage());
System.exit(1);
}
return copts;
}
private static void parseOptions(String[] args, CmdLineOpts copts)
{
String usage = WELCOME_MSG +
"Usage: java -jar xrpcwrapper.jar [options]\n" +
"OPTIONS:\n" + copts.produceHelpMessage();
try{
copts.processArgs(args);
if (copts.getOption("help").isPresent() ||
copts.getOption("version").isPresent()) {
System.out.print(usage);
System.exit(0);
} else if (!copts.getOption("command").isPresent()) {
System.err.println(
"ERROR: missing mandatory option: --command\n" +
"Don't know how to execute the XQuery " +
"engine.\n\n" + usage);
System.exit(-1);
} else if (copts.getOption("port").isPresent()){
CmdLineOpts.OptionContainer portOpt =
copts.getOption("port");
String port = portOpt.getArgument();
portOpt.resetArguments();
portOpt.addArgument(port);
} else if (copts.getOption("rootdir").isPresent()){
CmdLineOpts.OptionContainer rootdirOpt =
copts.getOption("rootdir");
String rootdir = rootdirOpt.getArgument();
if(!rootdir.endsWith(FILE_SEPARATOR)) {
rootdir += FILE_SEPARATOR;
}
rootdirOpt.resetArguments();
rootdirOpt.addArgument(rootdir);
}
} catch (OptionsException oe){
System.out.println("Invalide option: " + oe.getMessage());
System.out.println("\n" + usage);
System.exit(1);
}
}
public static void main (String[] args)
{
CmdLineOpts opts = initCmdLineOpts();
parseOptions(args, opts);
XRPCWrapper wrapper = new XRPCWrapper(opts);
wrapper.run();
}
}
--- NEW FILE: XRPCWrapperWorker.java ---
/**
* The contents of this file are subject to the MonetDB Public License
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and
* limitations under the License.
* The Original Code is the MonetDB Database System.
* The Initial Developer of the Original Code is CWI.
* Portions created by CWI are Copyright (C) 1997-2007 CWI.
* All Rights Reserved.
**/
package nl.cwi.monetdb.xquery.xrpc.wrapper;
import java.io.*;
import java.net.*;
import javax.xml.xpath.*;
import org.w3c.dom.*;
import nl.cwi.monetdb.util.*;
import nl.cwi.monetdb.xquery.xrpc.api.*;
public class XRPCWrapperWorker extends Thread {
/** Simple counter for XRPC worker threads. */
private static int tid = 0;
private Socket sock;
private CmdLineOpts opts;
private String rootdir;
private String soapPrefix;
private String xrpcPrefix;
private int contentLength;
private boolean debug;
XRPCWrapperWorker(Socket s, CmdLineOpts o)
throws Exception
{
super("XRPCWrapperWorkerThread-" + tid++);
sock = s;
opts = o;
rootdir = o.getOption("rootdir").getArgument();
debug = o.getOption("debug").isPresent();
soapPrefix = "";
xrpcPrefix = "";
}
private void DEBUG(String msg)
{
if(debug) System.out.print(msg);
}
/**
* Generate an XQuery query for the request and write the query
* to a file.
*/
private void generateQuery(String requestFilename,
String queryFilename,
String request)
throws Exception
{
String infoHeader = "INFO: ("+getName()+") generateQuery(): ";
String errHeader = "ERROR: ("+getName()+") generateQuery(): ";
soapPrefix = XRPCMessage.getNamespacePrefix(
XRPCMessage.XRPC_MSG_TYPE_REQ, request,
XRPCMessage.SOAP_NS);
xrpcPrefix = XRPCMessage.getNamespacePrefix(
XRPCMessage.XRPC_MSG_TYPE_REQ, request,
XRPCMessage.XRPC_NS);
NamespaceContextImpl namespaceContext = new NamespaceContextImpl();
namespaceContext.add(soapPrefix, XRPCMessage.SOAP_NS);
namespaceContext.add(xrpcPrefix, XRPCMessage.XRPC_NS);
XPathFactory factory = XPathFactory.newInstance();
XPath xPath = factory.newXPath();
xPath.setNamespaceContext(namespaceContext);
String xPathExpr = "/" + soapPrefix + ":Envelope/" +
soapPrefix + ":Body/" +
xrpcPrefix + ":request";
NamedNodeMap attrs = XRPCMessage.getNodeAttributes(xPath, request,
xPathExpr);
String xqModule =
attrs.getNamedItem(xrpcPrefix+":module").getNodeValue();
String xqMethod =
attrs.getNamedItem(xrpcPrefix+":method").getNodeValue();
String xqLocation =
attrs.getNamedItem(xrpcPrefix+":location").getNodeValue();
String arityStr =
attrs.getNamedItem(xrpcPrefix+":arity").getNodeValue();
int arity = -1;
try{
arity = Integer.parseInt(arityStr);
} catch(NumberFormatException nfe) {
throw new XRPCSenderException("Invalid value of the " +
"\"arity\" attribute: \"" + arityStr + "\": " +
nfe.getMessage());
}
try{
FileWriter fileOut = new FileWriter(queryFilename,
false);
fileOut.write(
"import module namespace modfunc = \""
+ xqModule + "\"" +
" at \"" + xqLocation + "\";\n\n" +
"import module namespace wf =
\"xrpcwrapper\"" +
" at \"" + rootdir + XRPCWrapper.WF_FILE + "\";\n\n" +
"declare namespace env=\"" +
XRPCMessage.SOAP_NS + "\";\n" +
"declare namespace xrpc=\"" +
XRPCMessage.XRPC_NS + "\";\n\n" +
"declare namespace xs=\"" +
XRPCMessage.XS_NS + "\";\n" +
"<env:Envelope" +
" xsi:schemaLocation=\"" +
XRPCMessage.XRPC_NS + " " +
XRPCMessage.XRPC_LOC + "\">" +
"<env:Body>" +
"<xrpc:response" +
" xrpc:module=\"" + xqModule + "\" " +
" xrpc:method=\"" + xqMethod + "\">{\n"
+
" for $call in doc(\"" +
requestFilename + "\")" +
"//" + xrpcPrefix + ":call\n");
for(int i = 1; i <= arity; i++){
/* NB: XQuery index of item sequences starts
from 1 */
fileOut.write(" let $param" + i + " := " +
"wf:n2s(\"" + xrpcPrefix + "\",
" +
"$call/" + xrpcPrefix +
":sequence["+i+"]" +
")\n");
}
fileOut.write(" return wf:s2n(modfunc:"+xqMethod+"(");
for(int i = 1; i <= (arity - 1); i++){
fileOut.write("$param" + i + ", ");
}
if(arity > 0){
fileOut.write("$param" + arity);
}
fileOut.write("))\n}" + XRPCMessage.XRPC_RESPONSE_END);
fileOut.close();
} catch (Exception e){
throw new XRPCReceiverException(
"failed to generate query for the
request.");
}
DEBUG(infoHeader + "query generated in "+queryFilename+"\n");
}
/**
* Call the XQuery engine to execute the query and send the
* results back.
*/
private void execAndSend(String queryFilename,
BufferedWriter sockOut)
throws Exception
{
String infoHeader = "INFO: ("+getName()+") execAndSend(): ";
String errHeader = "ERROR: ("+getName()+") execAndSend(): ";
String command = null;
Process proc = null;
try{
command = opts.getOption("command").getArgument() + " "
+
queryFilename;
DEBUG(infoHeader + "executing: " + command + "\n");
proc = Runtime.getRuntime().exec(command);
} catch (OptionsException oe) {
/* This exception should never happen */
throw new XRPCReceiverException("This should not
happen: " +
"XRPC wrapper does not know how to
execute the " +
"XQuery engine: " + oe.getMessage());
} catch (Exception e){
throw new XRPCReceiverException(
"Failed to start executing command \"" +
command + "\":" + e.getMessage());
}
BufferedReader procIn = new BufferedReader(new
InputStreamReader(proc.getInputStream()));
int c = -1;
try{
c = procIn.read();
} catch (IOException ioe){
System.out.println(infoHeader + "caught IOException " +
"while reading proc's output:");
ioe.printStackTrace();
/* Don't throw exception, try to read proc's
ErrorStream */
c = -1;
}
if (c >= 0){
DEBUG(infoHeader + "query execution seems succeeded: " +
"got output from the process'
InputStream.\n");
DEBUG(infoHeader + "sending XRPC response message:\n");
sockOut.write(XRPCHTTPConnection.HTTP_OK_HEADER);
DEBUG(XRPCHTTPConnection.HTTP_OK_HEADER);
/* Add the XML declaration, if necessary */
char[] cbuf = new char[4];
procIn.read(cbuf, 0, 4);
if((char)c != '<' || cbuf[0] != '?' || cbuf[1] != 'x' || cbuf[2] !=
'm' || cbuf[3] != 'l') {
sockOut.write(XRPCMessage.XML_DECL);
DEBUG(XRPCMessage.XML_DECL);
}
sockOut.write(new Character((char)c).toString());
sockOut.write(cbuf, 0, 4);
DEBUG(new Character((char)c).toString());
DEBUG("" + cbuf[0] + cbuf[1] + cbuf[2] + cbuf[3]);
while ((c = procIn.read()) >= 0) {
sockOut.write(new
Character((char)c).toString());
DEBUG(new Character((char)c).toString());
}
} else { /* send SOAP Fault message */
StringBuffer faultReason = new StringBuffer(8192);
BufferedReader errIn = new BufferedReader(new
InputStreamReader(proc.getErrorStream()));
while ((c = errIn.read()) >= 0) {
faultReason.append(new Character((char)c).toString());
}
errIn.close();
String faultMsg =
XRPCMessage.SOAP_FAULT(soapPrefix+":Receiver",
faultReason.toString());
DEBUG(XRPCHTTPConnection.HTTP_ERR_500_HEADER + faultMsg);
XRPCHTTPConnection.send(sockOut,
XRPCHTTPConnection.HTTP_ERR_500_HEADER, faultMsg);
}
int ret = proc.waitFor();
DEBUG(infoHeader + "query execution exits with: " + ret + "\n");
}
/**
* Main function to handle an XRPC request:
* 1. read the request and write it to a file
* 2. generate an XQuery query for the request and write the
* query to a file
* 3. Call the XQuery engine to execute the query and send the
* results back
*/
private void handleXRPCReq(String request, BufferedWriter sockOut)
throws Exception
{
String infoHeader = "INFO: ("+getName()+") handleXRPCReq(): ";
String warnHeader = "WARNING: ("+getName()+") handleXRPCReq():
";
String requestFilename =
rootdir+"xrpc_request_"+getName()+".xml";
String queryFilename =
rootdir+"xrpc_wrapper_query_"+getName()+".xq";
BufferedWriter fileOut = new BufferedWriter(new
FileWriter(requestFilename, false));
fileOut.write(request.toString());
fileOut.close();
DEBUG(infoHeader +
"request (" + request.length() + " bytes) stored in: " +
requestFilename + "\n");
generateQuery(requestFilename, queryFilename, request);
execAndSend(queryFilename, sockOut);
DEBUG(infoHeader + "DONE: " + sock + "\n\n");
if(opts.getOption("remove").isPresent()){
try {
File fQ = new File(queryFilename);
File fR = new File(requestFilename);
String arg = opts.getOption("remove").getArgument();
if(arg.equals("request")) {
fQ.delete();
DEBUG(infoHeader + "request file \"" +
requestFilename + "\" deleted\n");
} else if(arg.equals("query")) {
fR.delete();
DEBUG(infoHeader + "query file \"" +
queryFilename + "\" deleted\n");
} else if(arg.equals("all")) {
fQ.delete();
fR.delete();
DEBUG(infoHeader + "query file \"" +
queryFilename + "\" deleted\n");
DEBUG(infoHeader + "request file \"" +
requestFilename + "\" deleted\n");
}
} catch (Exception e) {
System.out.println(warnHeader +
"failed to delete temporary file(s):");
e.printStackTrace();
}
}
}
public void run()
{
String warnHeader = "WARNING: ("+getName()+") run(): ";
String errHeader = "ERROR: ("+getName()+") run(): ";
BufferedReader sockIn = null;
BufferedWriter sockOut = null;
try{
try{
sockIn = new BufferedReader(new
InputStreamReader(sock.getInputStream()));
sockOut = new BufferedWriter(new
OutputStreamWriter(sock.getOutputStream()));
String reqMsg = XRPCHTTPConnection.receive(sockIn,
XRPCWrapper.XRPCD_CALLBACK);
handleXRPCReq(reqMsg, sockOut);
} catch (XRPCSenderException se) {
String faultMsg =
XRPCMessage.SOAP_FAULT(soapPrefix+":Sender",
se.getMessage());
XRPCHTTPConnection.send(sockOut,
XRPCHTTPConnection.HTTP_ERR_404_HEADER, faultMsg);
System.err.println(errHeader + "caught exception:");
se.printStackTrace();
System.err.println(errHeader + "sent SOAP Fault message:");
DEBUG(XRPCHTTPConnection.HTTP_ERR_404_HEADER + faultMsg);
} catch (XRPCReceiverException re) {
String faultMsg =
XRPCMessage.SOAP_FAULT(soapPrefix+":Receiver",
re.getMessage());
XRPCHTTPConnection.send(sockOut,
XRPCHTTPConnection.HTTP_ERR_500_HEADER, faultMsg);
System.err.println(errHeader + "caught exception:");
re.printStackTrace();
System.err.println(errHeader + "sent SOAP Fault message:");
DEBUG(XRPCHTTPConnection.HTTP_ERR_500_HEADER + faultMsg);
}
} catch (Exception e1){
System.err.println(errHeader + "caught exception:");
e1.printStackTrace();
} finally {
try{
sockOut.close();
sockIn.close();
sock.close();
} catch (Exception e2) {
System.err.println(warnHeader +
"caught exception in FINAL
block:");
e2.printStackTrace();
}
}
}
}
--- NEW FILE: wrapper_functions.xq ---
module namespace wrapper = "xrpcwrapper";
declare namespace xrpcm = "http://monetdb.cwi.nl/XQuery";
declare function wrapper:add-type($val as item()) as node()
{
typeswitch ($val)
case xs:boolean return
<xrpcm:atomic-value xsi:type="xs:boolean">
{$val}
</xrpcm:atomic-value>
case xs:integer return
<xrpcm:atomic-value xsi:type="xs:integer">
{$val}
</xrpcm:atomic-value>
case xs:decimal return
<xrpcm:atomic-value xsi:type="xs:decimal">
{$val}
</xrpcm:atomic-value>
case xs:float return
(: pathfinder does not support xs:float yet, so replace it
: with xs:double :)
<xrpcm:atomic-value xsi:type="xs:double">
{$val}
</xrpcm:atomic-value>
case xs:double return
<xrpcm:atomic-value xsi:type="xs:double">
{$val}
</xrpcm:atomic-value>
case xs:anyURI return
<xrpcm:atomic-value xsi:type="xs:anyURI">
{$val}
</xrpcm:atomic-value>
case xs:string return
<xrpcm:atomic-value xsi:type="xs:string">
{$val}
</xrpcm:atomic-value>
case xs:untypedAtomic return
<xrpcm:atomic-value xsi:type="xs:untypedAtomic">
{$val}
</xrpcm:atomic-value>
case xs:anyAtomicType return
<xrpcm:atomic-value xsi:type="xs:anyAtomicType">
{$val}
</xrpcm:atomic-value>
case attribute() return
<xrpcm:attribute>{$val}</xrpcm:attribute>
case comment() return
<xrpcm:comment>{$val}</xrpcm:comment>
case document-node() return
<xrpcm:document>{$val}</xrpcm:document>
case element() return
<xrpcm:element>{$val}</xrpcm:element>
case processing-instruction() return
<xrpcm:processing-instruction>
{$val}
</xrpcm:processing-instruction>
case text() return
<xrpcm:text>{$val}</xrpcm:text>
case node() return
<xrpcm:node>{$val}</xrpcm:node>
default return
<xrpcm:item>{$val}</xrpcm:item>
};
declare function wrapper:s2n($seq as item()*) as node()
{
if(empty($seq)) then <xrpcm:sequence/>
else
<xrpcm:sequence> {
for $val in $seq
return wrapper:add-type($val)
} </xrpcm:sequence>
};
declare function wrapper:udf-s2n($seq as item()*,
$qn as xs:string) as node()
{
<xrpcm:sequence> {
for $val in $seq
return <xrpcm:element xsi:type="{$qn}">{$val}</xrpcm:element>
} </xrpcm:sequence>
};
declare function wrapper:n2s($nsid as xs:string, $nodes as node()*) as item()*
{
for $typenode in $nodes/child::*
return
if($typenode/name() = concat($nsid, ":atomic-value")) then
if (string($typenode/@xsi:type) = "xs:boolean") then
$typenode cast as xs:boolean
else if (string($typenode/@xsi:type) = "xs:integer") then
$typenode cast as xs:integer
else if (string($typenode/@xsi:type) = "xs:decimal") then
$typenode cast as xs:decimal
else if (string($typenode/@xsi:type) = "xs:double") then
$typenode cast as xs:double
else if (string($typenode/@xsi:type) = "xs:string") then
$typenode/text()
else if (string($typenode/@xsi:type) = "xs:anySimpleType") then
$typenode/text()
else
$typenode/text()
else if ($typenode/name() = concat($nsid, ":attribute")) then
let $attr-name := $typenode/attribute::*[1]/name()
return attribute {$attr-name} { $typenode/attribute::* }
else if ($typenode/name() = concat($nsid, ":comment")) then
comment {$typenode/comment()}
else if ($typenode/name() = concat($nsid, ":document")) then
document {$typenode/child::*}
else if ($typenode/name() = concat($nsid, ":element") or
$typenode/name() = concat($nsid, ":node") or
$typenode/name() = concat($nsid, ":anyNode") or
$typenode/name() = concat($nsid, ":item")) then
element {$typenode/child::*[1]/name()}
{$typenode/child::*[1]/child::*}
else if ($typenode/name() =
concat($nsid, ":processing-instruction")) then
let $pi := $typenode/processing-instruction()
return processing-instruction { $pi/name() } {$pi}
else if ($typenode/name() = concat($nsid, ":text")) then
text {$typenode/text()}
else ()
};
-------------------------------------------------------------------------
SF.Net email is sponsored by: The Future of Linux Business White Paper
from Novell. From the desktop to the data center, Linux is going
mainstream. Let it simplify your IT future.
http://altfarm.mediaplex.com/ad/ck/8857-50307-18918-4
_______________________________________________
Monetdb-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-checkins