Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv11815/runtime

Modified Files:
      Tag: xrpcdemo
        Makefile.ag pathfinder.mx pf_support.mx serialize_dflt.mx 
        shredder.mx xrpc_client.mx xrpc_common.mx xrpc_server.mx 
Log Message:
Ongoing implementation for the SIGMOD demo (now to the right branch)



U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.1
retrieving revision 1.416.2.1.2.2
diff -u -d -r1.416.2.1.2.1 -r1.416.2.1.2.2
--- pathfinder.mx       25 May 2008 22:44:02 -0000      1.416.2.1.2.1
+++ pathfinder.mx       25 May 2008 22:55:07 -0000      1.416.2.1.2.2
@@ -94,6 +94,7 @@
 module(pf_support);
 module(logger);
 module(mkey);
+module(xrpc_common);
 module(xrpc_server);
 module(xrpc_client);
 module(pf_standoff);
@@ -340,29 +341,30 @@
 @:[EMAIL PROTECTED](CONT_NAME,         41, void, str, void, void, oid_nil)@
 @:[EMAIL PROTECTED](CONT_RUNTIME,      42, void, bat, void, void, oid_nil)@
 @:[EMAIL PROTECTED](CONT_LOCKED,       43, void, lock,void, void, oid_nil)@
+@:[EMAIL PROTECTED](XRPC_SUCCESSOR,    44, void, str, void, void, oid_nil)@
 
-@:[EMAIL PROTECTED](_MAP_PID,          44, void, bat, void,  oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_SIZE,         45, void, bat, void,  int, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_LEVEL,        46, void, bat, void,  chr, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_PROP,         47, void, bat, void,  oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_KIND,         48, void, bat, void,  chr, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_NID,          49, void, bat, void,  oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_NID_RID,          50, void, bat, void,  oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_FRAG_ROOT,        51, void, bat,  oid,  oid, oid_nil)@
-@:[EMAIL PROTECTED](_QN_HISTOGRAM,     52, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_PREFIX_URI_LOC,53, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_URI_LOC,       54, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_PREFIX,        55, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_URI,           56, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_LOC,           57, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_TEXT,        58, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_COM,         59, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_INS,         60, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_TGT,         61, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_VAL,         62, void, bat, void,  str, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_OWN,         63, void, bat, void,  oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_QN,          64, void, bat, void,  oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_PROP,        65, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_MAP_PID,          45, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_SIZE,         46, void, bat, void,  int, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_LEVEL,        47, void, bat, void,  chr, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_PROP,         48, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_KIND,         49, void, bat, void,  chr, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_NID,          50, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_NID_RID,          51, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_FRAG_ROOT,        52, void, bat,  oid,  oid, oid_nil)@
+@:[EMAIL PROTECTED](_QN_HISTOGRAM,     53, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_PREFIX_URI_LOC,54, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_URI_LOC,       55, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_PREFIX,        56, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_URI,           57, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_LOC,           58, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_TEXT,        59, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_COM,         60, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_INS,         61, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_TGT,         62, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_VAL,         63, void, bat, void,  str, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_OWN,         64, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_QN,          65, void, bat, void,  oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_PROP,        66, void, bat, void,  oid, PRE_BASE)@
 @-
 The bottom segment are the master copies of the top segment.
 The middle segment is private to the query.
@@ -372,7 +374,7 @@
 BEWARE: the logger version number below needs to be incremented whenever the 
working set schema is changed.
 
 @= ws_decl
-@:[EMAIL PROTECTED](WS_SIZE, 66)@
+@:[EMAIL PROTECTED](WS_SIZE, 67)@
 @:[EMAIL PROTECTED](QNAME,    2)@
 @:[EMAIL PROTECTED](BOOL,     3)@
 @:[EMAIL PROTECTED](INT,      4)@
@@ -394,7 +396,7 @@
 @:ws_decl(mil)@
 
 # logger version number, needs to be incremented whenever the working set 
schema is changed
-const LOGGER_VERSION := 1;
+const LOGGER_VERSION := 2;
 
 # KIND constants, carefully chosen
 # atomic value items can be retrieved with 'kind.select(int_nil,ATOMIC)'
@@ -568,7 +570,7 @@
 # there is just a global lock for this.
 var pf_extend := pflock_get(3); # protects the counter
 var pf_extend_cnt := 0; # counter is used by nonexclusive acces (first gets 
barrier, last releases)
-var pf_extend_barrier := sema(ptr(pflock_get(4))); # the barrier
+var pf_extend_barrier := sema(ptr(pflock_get(9))); # the barrier
 
 # master bat *UPDATES* must be protected against checkpoints (global BBP 
subcommits + log restart)
 # master updates take a non-exclusive pf_chkpt lock; pf_checkpoint() takes an 
exclusive access
@@ -633,6 +635,11 @@
     commitBAT.append("doc_collection");
     commitBAT.append("doc_timestamp");
     commitBAT.append("uri_lifetime");
+    commitBAT.append("xrpc_qids");
+    commitBAT.append("xrpc_wsids");
+    commitBAT.append("xrpc_timeouts");
+    commitBAT.append("xrpc_locks");
+    commitBAT.append("xrpc_statuss");
     var ok := false;
     lock_set(pf_wal);
     var err := CATCH(ok := subcommit(commitBAT));
@@ -710,6 +717,14 @@
 
 var uri_lifetime;     # caching rules
 
+# meta-data BATs for multi-request XRPC transactions
+var xrpc_qids;        # bat[void,str] query-id (host|timestamp)
+var xrpc_timeouts;    # bat[void,lng] query timeout in msec
+var xrpc_wsids;       # bat[void,lng] ID of the workingset associated with 
this query
+var xrpc_locks;       # bat[void,lock] lock for the workingset of this query 
+var xrpc_statuss;     # bat[void,str] 2PC status of this query: "exec", 
"prepare", "commit" or "abort"
+var xrpclock := pflock_get(6); # master XRPC lock, to protect above xrpc_* BATs
+
 # initialize persistent bats (use BBP as global mechanism to discover 
initialization)
 if (isnil(CATCH(bat("doc_name").count()))) {
     collection_name  := bat("collection_name");
@@ -719,6 +734,11 @@
     doc_location     := bat("doc_location");
     doc_timestamp    := bat("doc_timestamp");
     uri_lifetime     := bat("uri_lifetime");
+    xrpc_qids        := bat("xrpc_qids");
+    xrpc_timeouts    := bat("xrpc_timeouts");
+    xrpc_wsids       := bat("xrpc_wsids");
+    xrpc_locks       := bat("xrpc_locks");
+    xrpc_statuss     := bat("xrpc_statuss");
 
     # check that this database was created with a compatible pagesize as 
requested now
     var pagebits := 16; 
@@ -734,6 +754,11 @@
     doc_location    := new(oid,str).persists(true).rename("doc_location");
     doc_timestamp   := 
new(oid,timestamp).persists(true).rename("doc_timestamp");
     uri_lifetime    := new(str,lng).insert("tmp", 
1LL).persists(true).rename("uri_lifetime");
+    xrpc_qids       := new(void,str).seqbase([EMAIL 
PROTECTED]).persists(true).rename("xrpc_qids");
+    xrpc_timeouts   := new(void,lng).seqbase([EMAIL 
PROTECTED]).persists(true).rename("xrpc_timeouts");
+    xrpc_wsids      := new(void,lng).seqbase([EMAIL 
PROTECTED]).persists(true).rename("xrpc_wsids");
+    xrpc_locks      := new(void,lock).seqbase([EMAIL 
PROTECTED]).persists(true).rename("xrpc_locks");
+    xrpc_statuss    := new(void,str).seqbase([EMAIL 
PROTECTED]).persists(true).rename("xrpc_statuss");
     doc_collection.insert(DOCID_CURID_HACK, DOCID_MIN); # hack: store next oid 
as in invalid tuple
     doc_collection.insert(DOCID_PGBIT_HACK, oid(REMAP_PAGE_BITS)); # hack: 
store pagesize also
     pf_checkpoint(bat(void,str).append("uri_lifetime"));
@@ -4016,7 +4041,7 @@
 
 /* exports for XRPC */
 pathfinder_export void  xquery_client_engine(mapi_client*);
-pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*, 
lng, lng, lng**, str*, str*, BAT*);
+pathfinder_export char* xquery_method(mapi_client*, int, char*, lng, char*, 
char*, char*, lng, lng, lng**, str*, str*, BAT*);
 pathfinder_export void  xquery_client_end(mapi_client *, char *); 
 
 #endif
@@ -4494,7 +4519,7 @@
  * - resolve a method call in the current xquery context (return NULL if not 
resolved)
  *
  * char*
- * xquery_function_call(xquery_client *ctx, lng usec, char *ns, char *method, 
+ * xquery_function_call(xquery_client *ctx, lng usec, char* qid, lng timeout, 
char *ns, char *method, 
  *                      int argc, int itercnt, int** argcnt, char** argtpe, 
char** argval, BAT *shredBAT)
  * - call a function ns:method(). try to use the function cache (ie re-use a 
cached MIL tree).
  *   otherwise generate MIL yourself, interpret it (and cache it). Returns 
error string (NULL if ok).
@@ -4664,6 +4689,8 @@
 static char* 
 xquery_function_call(xquery_client *ctx, 
                      lng usec, 
+                     char *qid,
+                     lng timeout,
                      char *ns, 
                      char *method, 
                      lng argc, 
@@ -4717,11 +4744,16 @@
     if (fun->mil == NULL) {
         MT_set_lock(pf_cache_lock, "xquery_function_call");
         if (fun->mil == NULL) {
-            /* create working set */
             int ret;
- 
-            src = (char*) PFstartMIL(fun->sig->update);
-            while(*src && cur < end) *cur++ = *src++;
+
+            /* create working set */
+            if(qid && *qid) { /* multi-request XRPC transaction */
+                ret = PFstartMIL_XRPCTrans(cur, XQUERY_BUFSIZE-(cur-mil), 
fun->sig->update, qid, timeout);
+                if(ret > 0) cur += ret;
+            } else {
+                src = (char*) PFstartMIL(fun->sig->update);
+                while(*src && cur < end) *cur++ = *src++;
+            }
 
             if (shredBAT) {
                 /* add shredded RPC request message to the working set */
@@ -4737,7 +4769,11 @@
             if (ret > 0) cur += ret;
 
             /* destroy working set */
-            ret = snprintf(cur, XQUERY_BUFSIZE-(cur-mil), 
PFstopMIL(fun->sig->update));
+            if(qid && *qid){ /* multi-request XRPC transaction */
+                ret = PFstopMIL_XRPCTrans(cur, XQUERY_BUFSIZE-(cur-mil), 
fun->sig->update, qid);
+            } else {
+                ret = snprintf(cur, XQUERY_BUFSIZE-(cur-mil), 
PFstopMIL(fun->sig->update));
+            }
             mil[XQUERY_BUFSIZE - 1] = 0;
             if (ret > 0) cur += ret;
 
@@ -4848,7 +4884,7 @@
         stream_write(ctx->fderr, fun->mil, strlen(fun->mil), 1);
     }
 
-    /* set the MIL shredBAT and getType variables to the actual values */
+    /* set the MIL shredBAT and genType variables to the actual values */
     ctx->shredBAT[0] = shredBAT?shredBAT->batCacheid:0;
     *(ctx->time_compile) = GDKusec() - usec;
     
@@ -5731,7 +5767,7 @@
                     lng* cnt_ptr = cnt;
                     char nsbak = *nsend, locbak = *locend;
                     *nsend = 0; *locend = 0;
-                    err = xquery_function_call(ctx, usec, ns, loc, argc, 1, 
&cnt_ptr, tpe, param, NULL);
+                    err = xquery_function_call(ctx, usec, NULL, 0, ns, loc, 
argc, 1, &cnt_ptr, tpe, param, NULL);
                     *nsend = nsbak; *locend = locbak;
                 }
             }
@@ -6065,6 +6101,7 @@
 
 /* xquery_method : execute a loop-lifted xquery function
  *
+ * flags         = 1: timing, 2: serialization mode, 4: debug
  * argc          = #params
  * itercnt       = #iterations
  * argcnt[iter]  = #items per param
@@ -6079,6 +6116,8 @@
 char*
 xquery_method(mapi_client *mc,
               int flags,
+              char* qid,
+              lng timeout,
               char* module,
               char* uri,
               char* method,
@@ -6106,8 +6145,7 @@
     err = xquery_change_genType(ctx, mode);
     if (err) return err;
 
-    if (argc >= 1000) {
-        /* hack: pass argc+1000 and you get debug output */
+    if (flags&4) { /* write debug output */
         s = open_wastream("/tmp/xrpc.mil");
         char *prologue = (char*) PFinitMIL();
         if (s) {
@@ -6115,14 +6153,13 @@
             ctx->mode |= XQ_DEBUG;
         }
         stream_write(ctx->fderr, prologue, strlen(prologue), 1);
-        argc = argc % 1000;
     }
     if (err == NULL && module){
         err = xquery_module_load(ctx, ns="xrpc", module, uri); 
     }
 
     if (err == NULL) { 
-        err = xquery_function_call(ctx, usec, ns, method, argc, itercnt, 
argcnt, argtpe, argval, shredBAT);
+        err = xquery_function_call(ctx, usec, qid, timeout, ns, method, argc, 
itercnt, argcnt, argtpe, argval, shredBAT);
         if (err == (char*) -1) err = "xquery_method: function could not be 
resolved.\n";
         else if (err == xquery_function_error) err = 
xquery_nondescriptive_error;
     }

U xrpc_common.mx
Index: xrpc_common.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_common.mx,v
retrieving revision 1.1.8.1
retrieving revision 1.1.8.2
diff -u -d -r1.1.8.1 -r1.1.8.2
--- xrpc_common.mx      25 May 2008 22:44:07 -0000      1.1.8.1
+++ xrpc_common.mx      25 May 2008 22:55:11 -0000      1.1.8.2
@@ -26,6 +26,14 @@
 @a Ying Zhang
 @t Includes header files, definitions shared by the XRPC server and XRPC client
 
[EMAIL PROTECTED]
+.MODULE xrpc_common;
+
+.COMMAND my_hostname() : str = CMDmy_hostname;
+"Returns the hostname of the localhost."
+
+.END xrpc_common;
+
 @h
 #ifndef XRPC_COMMON_H
 #define XRPC_COMMON_H
@@ -46,14 +54,20 @@
     #include <sys/select.h>
     #include <sys/types.h>     /* used by socket */
     #include <sys/socket.h>
-    #include <unistd.h>
+    #include <unistd.h> /* gethostname() */
     #include <netinet/in.h> /* hton and ntoh */
     #include <arpa/inet.h>  /* dotted IP addr to and from 32-bits int */
-    #include <netdb.h>      /* convert domain names into IP addr */
+    #include <netdb.h>      /* gethostbyname(), h_errno */
     #include <errno.h>
     #include <ctype.h>
 #endif
 
+#define ISOLATION_NONE          1
+#define ISOLATION_REPEATABLE    2
+#define ISOLATION_2PC           3
+
+#define XRPCLOCK_IDX            6
+
 #define XRPC_REQ_CALLBACK       "/xrpc"
 
 #define MXQ_ADMIN   "http://monetdb.cwi.nl/XQuery/admin/";
@@ -79,6 +93,7 @@
                                      " xrpc:arity=\"%lld\""         \
                                      " xrpc:iter-count=\"%lld\""    \
                                      " xrpc:updCall=\"%s\">"
+#define XRPC_QID "<xrpc:queryID xrpc:host=\"%s\" xrpc:timestamp=\"%s\" 
xrpc:timeout=\"%s\"/>"
 
 #define XRPC_HTTP_CALL "<xrpc:call>"                                    \
                          "<xrpc:sequence>"                              \
@@ -103,5 +118,178 @@
                       "</env:Body>"         \
                     "</env:Envelope>\n"
 
+/* exports for xrpc_server.mx and serialize_dflt.mx */
+int check_timeout_by_idx(oid qid_idx);
+int check_timeout_by_qid(char *qid, lng timestamp, lng timeout);
+
 #endif /* XRPC_COMMON_H */
 
[EMAIL PROTECTED]
+#include "pf_config.h"
+#include "xrpc_common.h"
+#include "pf_support.h"
+
+int CMDmy_hostname(char **res)
+{
+    int ret = 0, len = HOST_NAME_MAX > 255 ? HOST_NAME_MAX : 255;
+    char err[1024];
+
+    char *hname = GDKmalloc(len);
+    if(!hname) {
+        GDKerror("CMDmy_hostname: failed to malloc 'hname'\n");
+        return GDK_FAIL;
+    }
+    
+    errno = 0;
+    ret = gethostname(hname, len);
+    if(ret < 0) {
+        snprintf(err, 1024, "CMDmy_hostname: gethostname() failed: %s.\n", 
strerror(errno));
+        return GDK_FAIL;
+    }
+
+    *res = hname;
+    return GDK_SUCCEED;
+}
+
+/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
+int
+check_timeout_by_qid(char *qid, lng timestamp, lng timeout)
+{
+    MT_Lock *xrpclock = NULL;
+    int xrpclock_idx = XRPCLOCK_IDX;
+    BAT *xrpc_qids = NULL, *xrpc_statuss = NULL;
+    BATiter qidsi;
+    oid qid_idx = oid_nil;
+    BUN bun_qid = BUN_NONE;
+
+    if((timestamp + (timeout * 1000)) > GDKusec())
+        return 0; /* not timed out */
+
+    if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL) {
+        GDKerror("check_timeout_by_qid: "
+                 "failed to get the global lock 'xrpclock'!\n");
+        return -2;
+    }
+    MT_lock_set(xrpclock, "check_timeout_by_qid");
+
+    xrpc_qids = BATdescriptor(BBPindex("xrpc_qids"));
+    xrpc_statuss = BATdescriptor(BBPindex("xrpc_statuss"));
+    if(!xrpc_qids || !xrpc_statuss) {
+        if(xrpc_qids) BBPunfix(BBPcacheid(xrpc_qids));
+        if(xrpc_statuss) BBPunfix(BBPcacheid(xrpc_statuss));
+        MT_lock_unset(xrpclock, "check_timeout_by_qid");
+        GDKerror("check_timeout_by_qid: "
+                 "could not find the xrpc_* meta-BATs!\n");
+        return -2;
+    }
+
+    if((bun_qid = BUNfnd(BATmirror(xrpc_qids), qid)) ==  BUN_NONE) {
+        BBPunfix(BBPcacheid(xrpc_qids));
+        BBPunfix(BBPcacheid(xrpc_statuss));
+        MT_lock_unset(xrpclock, "check_timeout_by_qid");
+        GDKerror("check_timeout_by_qid: "
+                 "could not find QID \"%s\" in the meta-BAT "
+                 "\"xrpc_qids\"\n", qid);
+        return -1;
+    }
+
+    qidsi = bat_iterator(xrpc_qids);
+    qid_idx =  *(oid*)BUNtail(qidsi, bun_qid);
+    if(!BUNreplace(xrpc_statuss, &qid_idx, "timedout", FALSE)){
+        BBPunfix(BBPcacheid(xrpc_qids));
+        BBPunfix(BBPcacheid(xrpc_statuss));
+        MT_lock_unset(xrpclock, "check_timeout_by_qid");
+        GDKerror("check_timeout_by_qid: "
+                 "failed to change the status of query \"%s\" into "
+                 "\"timedout\"\n", qid);
+        return -2;
+    }
+
+    BBPunfix(BBPcacheid(xrpc_qids));
+    BBPunfix(BBPcacheid(xrpc_statuss));
+    MT_lock_unset(xrpclock, "check_timeout_by_qid");
+    return 1; /* timed out */
+}
+
+/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
+int
+check_timeout_by_idx(oid qid_idx)
+{
+    MT_Lock *xrpclock = NULL;
+    int xrpclock_idx = XRPCLOCK_IDX, ret = -2;
+    BAT *xrpc_qids = NULL, *xrpc_timeouts = NULL, *xrpc_statuss = NULL;
+    BATiter qidsi, timeoutsi;
+    BUN bun_qid = BUN_NONE, bun_to = BUN_NONE;
+    char *qid = NULL, *ts_str = NULL, *to_str = NULL;
+    lng ts = 0, to = 0;
+    char err[1024];
+
+    if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL) {
+        GDKerror("check_timeout_by_idx: "
+                 "failed to get the global lock 'xrpclock'!\n");
+        return ret;
+    }
+
+    MT_lock_set(xrpclock, "check_timeout_by_idx");
+
+    xrpc_qids = BATdescriptor(BBPindex("xrpc_qids"));
+    xrpc_timeouts = BATdescriptor(BBPindex("xrpc_timeouts"));
+    xrpc_statuss = BATdescriptor(BBPindex("xrpc_statuss"));
+    if(!xrpc_qids || !xrpc_timeouts || !xrpc_statuss) {
+        snprintf(err, 1024, "check_timeout_by_idx: failed to retrieve "
+                            "the xrpc_* meta-BATs!\n");
+        goto unlock_ret;
+    }
+
+    qidsi = bat_iterator(xrpc_qids);
+    timeoutsi = bat_iterator(xrpc_timeouts);
+    BUNfndVOID(bun_qid, qidsi, &qid_idx);
+    BUNfndVOID(bun_to, timeoutsi, &qid_idx);
+    if(bun_qid == BUN_NONE || bun_to == BUN_NONE) {
+        snprintf(err, 1024, "check_timeout_by_idx: "OIDFMT"-th BUN not "
+                            "found in the xrpc_* meta-BATs!\n", qid_idx);
+
+        ret = -1;
+        goto unlock_ret;
+    }
+
+    qid = (char*)BUNtail(qidsi, bun_qid);
+    to_str = (char*)BUNtail(timeoutsi, bun_to); assert(qid && to_str);
+    ts_str = strrchr(qid, '|'); assert(ts_str);
+    errno = 0; ts = strtoll(ts_str, NULL, 10);
+    if(errno) {
+        snprintf(err, 1024, "check_timeout_by_idx: invalid value of "
+                            "timestamp: \"%s\"!\n", ts_str);
+        goto unlock_ret;
+    }
+    errno = 0; to = strtoll(to_str, NULL, 10);
+    if(errno) {
+        snprintf(err, 1024, "check_timeout_by_idx: invalid value of "
+                            "timeout: \"%s\"!\n", to_str);
+        goto unlock_ret;
+    }
+
+    if((ts + (to * 1000)) > GDKusec()) {
+        ret = 0; /* not timed out */
+        goto unlock_ret;
+    }
+
+    if(!BUNreplace(xrpc_statuss, &qid_idx, "timedout", FALSE)){
+        snprintf(err, 1024, "check_timeout_by_idx: failed to change "
+                            "the status of query \"%s\" into "
+                            "\"timedout\"\n", qid);
+        goto unlock_ret;
+    }
+
+    ret = 1; /* timed out */
+    goto unlock_ret;
+
+unlock_ret:
+    if(xrpc_qids) BBPunfix(BBPcacheid(xrpc_qids));
+    if(xrpc_timeouts) BBPunfix(BBPcacheid(xrpc_timeouts));
+    if(xrpc_statuss) BBPunfix(BBPcacheid(xrpc_statuss));
+    MT_lock_unset(xrpclock, "check_timeout_by_idx");
+    if(ret < 0)
+        GDKerror(err);
+    return ret;
+}

U pf_support.mx
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.299
retrieving revision 1.299.4.1
diff -u -d -r1.299 -r1.299.4.1
--- pf_support.mx       22 May 2008 12:19:28 -0000      1.299
+++ pf_support.mx       25 May 2008 22:55:08 -0000      1.299.4.1
@@ -8111,7 +8111,7 @@
 @c
 #define PF_CONVOY 5
 BAT* ws_overlaps_ws = NULL;
-MT_Lock pf_runtime_lock[6];
+MT_Lock pf_runtime_lock[7];
 MT_Sema pf_runtime_sema[3];
 int pf_nreaders, pf_convoy, pf_ndocmgt;
 lng pf_writer, pf_special;
@@ -8122,6 +8122,7 @@
 #define PF_EXTEND_LOCK       pf_runtime_lock[3]
 #define PF_META_LOCK         pf_runtime_lock[4]
 #define PF_UPDATE_LOCK       pf_runtime_lock[5]
+#define PF_XRPC_LOCK         pf_runtime_lock[6]
 #define PF_META_BARRIER      pf_runtime_sema[0]
 #define PF_UPDATE_BARRIER    pf_runtime_sema[1]
 #define PF_EXTEND_BARRIER    pf_runtime_sema[2]
@@ -8133,8 +8134,8 @@
 
 /* get handle to the shared locks */
 int CMDpflock_get(ptr *ret, int* nr) {
-    *ret = (ptr) &pf_runtime_lock[(*nr)&3];
-    if (*nr > 3) *ret = (ptr) &PF_EXTEND_BARRIER; /* hack: do not feel like 
creating a CMDsema_get */
+    *ret = (ptr) &pf_runtime_lock[(*nr)&6];
+    if (*nr > 6) *ret = (ptr) &PF_EXTEND_BARRIER; /* hack: do not feel like 
creating a CMDsema_get */
     return GDK_SUCCEED;
 }
 

U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.1
retrieving revision 1.45.4.2
diff -u -d -r1.45.4.1 -r1.45.4.2
--- xrpc_client.mx      25 May 2008 22:44:05 -0000      1.45.4.1
+++ xrpc_client.mx      25 May 2008 22:55:11 -0000      1.45.4.2
@@ -40,14 +40,19 @@
 
 @- HTTP client function(s)
 @m
+
+.USE lock;
+
 .COMMAND http_post(
         str options,
+        str qid,
+        lng timeout,
[...1311 lines suppressed...]
+        GDKerror("do_simple_query: failed to create socket_rastream\n");
         return GDK_FAIL;
+    }
+    shredBAT = response2bat(timing, in, dst, port, *updCall);
+
+    stream_close(out); stream_destroy(out);
+    stream_close(in); stream_destroy(in);
+    buffer_destroy(req);
+
+    if(!shredBAT) return GDK_FAIL;
+
+    if(qid && *qid) {
+        if(add_xrpc_hosts(shredBAT, ws) == GDK_FAIL) {
+            BBPreclaim(shredBAT);
+            return GDK_FAIL;
+        }
+    }
 
     *res = shredBAT;
     return GDK_SUCCEED;

U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46
retrieving revision 1.46.4.1
diff -u -d -r1.46 -r1.46.4.1
--- serialize_dflt.mx   7 Apr 2008 12:54:13 -0000       1.46
+++ serialize_dflt.mx   25 May 2008 22:55:09 -0000      1.46.4.1
@@ -40,7 +40,7 @@
 /* contains dummy callback functions */
 #include "serialize_null.h"
 #include "pathfinder.h"
-#include "xrpc_common.h"
+#include "xrpc_common.h" /* *_NS defs, check_timeout_by_idx() */
 
 /* a lot of characters, static strings, and their sizes 
    (the idea is to reuse constant character pointers during
@@ -996,23 +996,147 @@
     return SUCCESS;
 }
 
+static bool
+xrpc_serializeQID(XqueryCtx *ctx, char *qid_idx)
+{
+    MT_Lock *xrpclock = NULL;
+    int xrpclock_idx = XRPCLOCK_IDX, ret = 0;
+    BAT *xrpc_qids = NULL, *xrpc_timeouts = NULL;
+    BATiter qidi, toi;
+    char *qid = NULL, *ts = NULL;
+    lng timeout = 0;
+    BUN bun_qid = BUN_NONE, bun_to = BUN_NONE;
+    oid idx = oid_nil;
+
+    if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL)
+        return PROBLEM;
+
+    MT_lock_set(xrpclock, "xrpc_serializeQID");
+    if((xrpc_qids = BATdescriptor(BBPindex("xrpc_qids")))) {
+        if((xrpc_timeouts = BATdescriptor(BBPindex("xrpc_timeouts")))) {
+            errno = 0;
+            ret = strtol(qid_idx, NULL, 10);
+            if(errno) {
+                GDKerror(strerror(errno));
+                goto err_ret;
+            }
+            
+            idx = (oid) ret;
+            qidi = bat_iterator(xrpc_qids);
+            toi = bat_iterator(xrpc_timeouts);
+            BUNfndVOID(bun_qid, qidi, &idx);
+            BUNfndVOID(bun_to, toi, &idx);
+            if(bun_qid == BUN_NONE || bun_to == BUN_NONE)
+                goto err_ret;
+            
+            qid = (char*)BUNtail(qidi, bun_qid);
+            timeout = *(lng*)BUNtail(toi, bun_to);
+            ts = strrchr(qid, '|');  assert(ts);
+            ts[0] = '\0'; ts++; /* separate 'timestamp' from 'host:port' */
+            
+            ret = stream_printf(ctx->out,
+                    "<xrpc:queryID host=\"%s\" timestamp=\"%s\" 
timeout=\""LLFMT"\"/>",
+                    qid, ts, timeout);
+            
+            ts--; ts[0] = '|';
+
+            if(ret < 0) goto err_ret;
+            BBPunfix(BBPcacheid(xrpc_qids));
+            BBPunfix(BBPcacheid(xrpc_timeouts));
+            MT_lock_unset(xrpclock, "xrpc_serializeQID");
+            return SUCCESS;
+
+        }
+        BBPunfix(BBPcacheid(xrpc_qids));
+    }
+    MT_lock_unset(xrpclock, "xrpc_serializeQID");
+    return PROBLEM;
+
+err_ret:
+    BBPunfix(BBPcacheid(xrpc_qids));
+    BBPunfix(BBPcacheid(xrpc_timeouts));
+    MT_lock_unset(xrpclock, "xrpc_serializeQID");
+    return PROBLEM;
+}
+
+static bool
+xrpc_serializeSuccessors(XqueryCtx *ctx, BAT *sucsBAT)
+{
+    oid i, cnt = BATcount(sucsBAT);
+    char *host = NULL;
+    BATiter sucsBATi = bat_iterator(sucsBAT);
+
+    if(stream_write(ctx->out, "<xrpc:participants>", 1, 19) != 19)
+        return PROBLEM;
+    for(i = 0; i < cnt; i++){
+        host = (char *) BUNtail(sucsBATi, i);
+        stream_printf(ctx->out, "<xrpc:participant>%s</xrpc:participant>", 
host);
+    }
+    if(stream_write(ctx->out, "</xrpc:participants>", 1, 20) != 20)
+        return PROBLEM;
+
+    return SUCCESS;
+}
+
 /**
  *  Start serialization of XRPC response message
  */
 static bool
 xrpc_startSerialize(XqueryCtx *ctx)
 {
-    stream_write(ctx->out, "HTTP/1.1 200 OK\r\n"
+    int ret = 0;
+    BUN bun = BUN_NONE;
+    BAT *sucsBAT = NULL;
+    BATiter wsi;
+    oid batID = XRPC_SUCCESSOR, idx = oid_nil;
+    char *name = NULL;
+    
+    /* Check if this XRPC query requires isolation.  If yes, we need to
+     * serialize 'queryID' and 'participants'. */
+    wsi = bat_iterator(ctx->ws);
+    BUNfndOID(bun, wsi, (ptr)&batID);
+    if(bun == BUN_NONE) return PROBLEM;
+    if(!(sucsBAT = BATdescriptor(*(bat*)Tloc(ctx->ws,bun))))
+        return PROBLEM;
+    if(!(name = BBPname(BBPcacheid(sucsBAT)))) {
+        BBPunfix(BBPcacheid(sucsBAT));
+        return PROBLEM;
+    }
+
+    /* check if the query has timed out or not, before printing results */
+    if(strncmp(name, "tmp_", 4) != 0) {
+        errno = 0;
+        idx = (oid) strtol(name, NULL, 10);
+        if(errno) {
+            GDKerror("xrpc_startSerialize: failed to convert the name "
+                    "of the BAT ws[XRPC_SUCCESS] to an oid: %s.\n",
+                    strerror(errno));
+            return PROBLEM;
+        }
+
+        ret = check_timeout_by_idx(idx);
+        if(ret < 0) {
+            return PROBLEM;
+        } else if(ret == 1) {
+            GDKerror("xrpc_startSerialize: xrpc-query timed out\n");
+            return PROBLEM;
+        }
+    }
+
+    ret = stream_write(ctx->out,
+            "HTTP/1.1 200 OK\r\n"
             "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1, 60);
+    if(ret != 60) return PROBLEM;
 
     /* We need to prepend "=" to each line, so make sure
        that we print one after each newline (encoded in
        newline string 'newline')->
        In addition we print one '=' at the beginning-> */
     if (ctx->modes & MODE_MAPI)
-        stream_write (ctx->out, &e_, 1, 1);
+        if((ret = stream_write (ctx->out, &e_, 1, 1)) != 1)
+            return PROBLEM;
 
-    stream_printf(ctx->out, 
+    ret = stream_printf(ctx->out, 
             "<?xml version=\"1.0\" encoding=\"utf-8\"?>%s"
             "<env:Envelope "
             "xmlns:env=\"%s\" "
@@ -1029,7 +1153,20 @@
             XSI_NS,
             XRPC_NS, XRPC_LOC,
             dflt_ws->module, dflt_ws->method);
+    if(ret < 0) return PROBLEM;
+
+    if(strncmp("tmp_", name, 4) != 0) {
+        /* This is a 2PC-XRPC query!  The name of the ws[XRPC_SUCCESS] BAT
+         * encodes where to find the query meta-data in the xrpc_* BATs. */
 
+        if(xrpc_serializeQID(ctx, name) == PROBLEM ||
+           xrpc_serializeSuccessors(ctx, sucsBAT) == PROBLEM) {
+            BBPunfix(BBPcacheid(sucsBAT));
+            return PROBLEM;
+        }
+    }
+    
+    BBPunfix(BBPcacheid(sucsBAT));
     return SUCCESS;
 }
 

U shredder.mx
Index: shredder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shredder.mx,v
retrieving revision 1.136.4.1
retrieving revision 1.136.4.2
diff -u -d -r1.136.4.1 -r1.136.4.2
--- shredder.mx 25 May 2008 22:44:05 -0000      1.136.4.1
+++ shredder.mx 25 May 2008 22:55:10 -0000      1.136.4.2
@@ -1560,7 +1560,7 @@
     if (shredCtx == NULL) return NULL;
 
     /* fill it */
-    int i = 4;
+    int i = 9;
     memset(shredCtx, 0, sizeof(shredCtxStruct)); 
     shredCtx->location  = location;
     shredCtx->fileSize  = fileSize;

U Makefile.ag
Index: Makefile.ag
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/Makefile.ag,v
retrieving revision 1.86
retrieving revision 1.86.4.1
diff -u -d -r1.86 -r1.86.4.1
--- Makefile.ag 20 May 2008 10:49:07 -0000      1.86
+++ Makefile.ag 25 May 2008 22:55:07 -0000      1.86.4.1
@@ -55,7 +55,7 @@
                 pf_support.mx \
                 shredder.mx 
         LIBS = \
-                libserialize libpf $(PF_LIBS) \
+                -l_xrpc_common libserialize libpf $(PF_LIBS) \
                 $(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS) -lmonet 
$(PTHREAD_LIBS) \
                 $(MONETDB4_MODS) -l_lock -l_monettime -l_streams -l_builtin 
-l_ascii_io -l_algebra -l_constant
 }
@@ -98,11 +98,17 @@
                 $(MONETDB4_MODS) -l_logger -l_streams -l_builtin -l_ascii_io 
-l_algebra -l_sys -l_constant -l_mapi 
 }
 
+lib__xrpc_common = {
+        DIR = libdir/MonetDB4
+        SOURCES = xrpc_common.mx
+        LIBS = \
+                          $(SOCKET_LIBS) \
+                          $(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS)
+}      
+
 lib__xrpc_client = {
         DIR = libdir/MonetDB4
-        SOURCES = \
-                                 xrpc_common.mx \
-                                 xrpc_client.mx
+        SOURCES = xrpc_client.mx
         LIBS = \
                           libserialize $(PF_LIBS) $(SOCKET_LIBS) 
./lib_pf_support ./lib_pathfinder \
                           $(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS) 
-lmonet \
@@ -113,7 +119,6 @@
         DIR = libdir/MonetDB4
         SOURCES = \
                                  shttpd.c shttpd.h \
-                                 xrpc_common.mx \
                                  xrpc_server.mx
         LIBS = \
                 ../compiler/libcompiler1 \
@@ -140,5 +145,5 @@
 headers_mil = {
        HEADERS = mil
         DIR = libdir/MonetDB4
-        SOURCES = pathfinder.mx pf_support.mx pf_standoff.mx xrpc_client.mx 
xrpc_server.mx
+        SOURCES = pathfinder.mx pf_support.mx pf_standoff.mx xrpc_client.mx 
xrpc_server.mx xrpc_common.mx
 }

U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.1
retrieving revision 1.68.4.2
diff -u -d -r1.68.4.1 -r1.68.4.2
--- xrpc_server.mx      25 May 2008 22:44:07 -0000      1.68.4.1
+++ xrpc_server.mx      25 May 2008 22:55:11 -0000      1.68.4.2
@@ -116,8 +116,8 @@
 }
 ADDHELP("get_xrpc_options", "zhang", "March 2007",
 "DESCRIPTION:\n\
-Find the options that should be pass to the XRPC server, currently only\
-\"timing\"",
+Find the options that should be pass to the XRPC server, currently \
+\"timing\" or \"debug\"",
 "xrpc_server");
 
 PROC rpcd_start() : void {
[...1240 lines suppressed...]
+            debug = 4;
+    }
 
     /* Register call back function, for XRPC (admin) requests, and XML 
(get/put/delete) file handling */
     shttpd_register_url(XRPC_REQ_CALLBACK, xrpc_fork_mapiclient, NULL);
@@ -1141,6 +1360,14 @@
     ctx = shttpd_open_port(xrpc_port, *open);
     listen_socket = ctx.sock;
 
+    /* Check for expired queries every 5 seconds */
+    sigfillset(&set); /* include all signals in 'set' */
+    act.sa_handler = check_timeout;
+    act.sa_mask = set; /* block all other signals */
+    act.sa_flags = 0; 
+    sigaction(SIGALRM, &act, NULL);
+    alarm(50000000);
+
     /* Serve connections infinitely until someone kills us */
     for ( ; rpcd_running; ) shttpd_poll(&ctx, 200);
 


-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins

Reply via email to