Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv11815/runtime
Modified Files:
Tag: xrpcdemo
Makefile.ag pathfinder.mx pf_support.mx serialize_dflt.mx
shredder.mx xrpc_client.mx xrpc_common.mx xrpc_server.mx
Log Message:
Ongoing implementation for the SIGMOD demo (now to the right branch)
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.1.2.1
retrieving revision 1.416.2.1.2.2
diff -u -d -r1.416.2.1.2.1 -r1.416.2.1.2.2
--- pathfinder.mx 25 May 2008 22:44:02 -0000 1.416.2.1.2.1
+++ pathfinder.mx 25 May 2008 22:55:07 -0000 1.416.2.1.2.2
@@ -94,6 +94,7 @@
module(pf_support);
module(logger);
module(mkey);
+module(xrpc_common);
module(xrpc_server);
module(xrpc_client);
module(pf_standoff);
@@ -340,29 +341,30 @@
@:[EMAIL PROTECTED](CONT_NAME, 41, void, str, void, void, oid_nil)@
@:[EMAIL PROTECTED](CONT_RUNTIME, 42, void, bat, void, void, oid_nil)@
@:[EMAIL PROTECTED](CONT_LOCKED, 43, void, lock,void, void, oid_nil)@
+@:[EMAIL PROTECTED](XRPC_SUCCESSOR, 44, void, str, void, void, oid_nil)@
-@:[EMAIL PROTECTED](_MAP_PID, 44, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_SIZE, 45, void, bat, void, int, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_LEVEL, 46, void, bat, void, chr, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_PROP, 47, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_KIND, 48, void, bat, void, chr, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_NID, 49, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_NID_RID, 50, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_FRAG_ROOT, 51, void, bat, oid, oid, oid_nil)@
-@:[EMAIL PROTECTED](_QN_HISTOGRAM, 52, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_PREFIX_URI_LOC,53, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_URI_LOC, 54, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_PREFIX, 55, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_URI, 56, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_LOC, 57, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_TEXT, 58, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_COM, 59, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_INS, 60, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_TGT, 61, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_VAL, 62, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_OWN, 63, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_QN, 64, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_PROP, 65, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_MAP_PID, 45, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_SIZE, 46, void, bat, void, int, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_LEVEL, 47, void, bat, void, chr, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_PROP, 48, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_KIND, 49, void, bat, void, chr, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_NID, 50, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_NID_RID, 51, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_FRAG_ROOT, 52, void, bat, oid, oid, oid_nil)@
+@:[EMAIL PROTECTED](_QN_HISTOGRAM, 53, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_PREFIX_URI_LOC,54, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_URI_LOC, 55, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_PREFIX, 56, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_URI, 57, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_LOC, 58, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_TEXT, 59, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_COM, 60, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_INS, 61, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_TGT, 62, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_VAL, 63, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_OWN, 64, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_QN, 65, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_PROP, 66, void, bat, void, oid, PRE_BASE)@
@-
The bottom segment are the master copies of the top segment.
The middle segment is private to the query.
@@ -372,7 +374,7 @@
BEWARE: the logger version number below needs to be incremented whenever the
working set schema is changed.
@= ws_decl
-@:[EMAIL PROTECTED](WS_SIZE, 66)@
+@:[EMAIL PROTECTED](WS_SIZE, 67)@
@:[EMAIL PROTECTED](QNAME, 2)@
@:[EMAIL PROTECTED](BOOL, 3)@
@:[EMAIL PROTECTED](INT, 4)@
@@ -394,7 +396,7 @@
@:ws_decl(mil)@
# logger version number, needs to be incremented whenever the working set
schema is changed
-const LOGGER_VERSION := 1;
+const LOGGER_VERSION := 2;
# KIND constants, carefully chosen
# atomic value items can be retrieved with 'kind.select(int_nil,ATOMIC)'
@@ -568,7 +570,7 @@
# there is just a global lock for this.
var pf_extend := pflock_get(3); # protects the counter
var pf_extend_cnt := 0; # counter is used by nonexclusive acces (first gets
barrier, last releases)
-var pf_extend_barrier := sema(ptr(pflock_get(4))); # the barrier
+var pf_extend_barrier := sema(ptr(pflock_get(9))); # the barrier
# master bat *UPDATES* must be protected against checkpoints (global BBP
subcommits + log restart)
# master updates take a non-exclusive pf_chkpt lock; pf_checkpoint() takes an
exclusive access
@@ -633,6 +635,11 @@
commitBAT.append("doc_collection");
commitBAT.append("doc_timestamp");
commitBAT.append("uri_lifetime");
+ commitBAT.append("xrpc_qids");
+ commitBAT.append("xrpc_wsids");
+ commitBAT.append("xrpc_timeouts");
+ commitBAT.append("xrpc_locks");
+ commitBAT.append("xrpc_statuss");
var ok := false;
lock_set(pf_wal);
var err := CATCH(ok := subcommit(commitBAT));
@@ -710,6 +717,14 @@
var uri_lifetime; # caching rules
+# meta-data BATs for multi-request XRPC transactions
+var xrpc_qids; # bat[void,str] query-id (host|timestamp)
+var xrpc_timeouts; # bat[void,lng] query timeout in msec
+var xrpc_wsids; # bat[void,lng] ID of the workingset associated with
this query
+var xrpc_locks; # bat[void,lock] lock for the workingset of this query
+var xrpc_statuss; # bat[void,str] 2PC status of this query: "exec",
"prepare", "commit" or "abort"
+var xrpclock := pflock_get(6); # master XRPC lock, to protect above xrpc_* BATs
+
# initialize persistent bats (use BBP as global mechanism to discover
initialization)
if (isnil(CATCH(bat("doc_name").count()))) {
collection_name := bat("collection_name");
@@ -719,6 +734,11 @@
doc_location := bat("doc_location");
doc_timestamp := bat("doc_timestamp");
uri_lifetime := bat("uri_lifetime");
+ xrpc_qids := bat("xrpc_qids");
+ xrpc_timeouts := bat("xrpc_timeouts");
+ xrpc_wsids := bat("xrpc_wsids");
+ xrpc_locks := bat("xrpc_locks");
+ xrpc_statuss := bat("xrpc_statuss");
# check that this database was created with a compatible pagesize as
requested now
var pagebits := 16;
@@ -734,6 +754,11 @@
doc_location := new(oid,str).persists(true).rename("doc_location");
doc_timestamp :=
new(oid,timestamp).persists(true).rename("doc_timestamp");
uri_lifetime := new(str,lng).insert("tmp",
1LL).persists(true).rename("uri_lifetime");
+ xrpc_qids := new(void,str).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_qids");
+ xrpc_timeouts := new(void,lng).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_timeouts");
+ xrpc_wsids := new(void,lng).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_wsids");
+ xrpc_locks := new(void,lock).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_locks");
+ xrpc_statuss := new(void,str).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_statuss");
doc_collection.insert(DOCID_CURID_HACK, DOCID_MIN); # hack: store next oid
as in invalid tuple
doc_collection.insert(DOCID_PGBIT_HACK, oid(REMAP_PAGE_BITS)); # hack:
store pagesize also
pf_checkpoint(bat(void,str).append("uri_lifetime"));
@@ -4016,7 +4041,7 @@
/* exports for XRPC */
pathfinder_export void xquery_client_engine(mapi_client*);
-pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*,
lng, lng, lng**, str*, str*, BAT*);
+pathfinder_export char* xquery_method(mapi_client*, int, char*, lng, char*,
char*, char*, lng, lng, lng**, str*, str*, BAT*);
pathfinder_export void xquery_client_end(mapi_client *, char *);
#endif
@@ -4494,7 +4519,7 @@
* - resolve a method call in the current xquery context (return NULL if not
resolved)
*
* char*
- * xquery_function_call(xquery_client *ctx, lng usec, char *ns, char *method,
+ * xquery_function_call(xquery_client *ctx, lng usec, char* qid, lng timeout,
char *ns, char *method,
* int argc, int itercnt, int** argcnt, char** argtpe,
char** argval, BAT *shredBAT)
* - call a function ns:method(). try to use the function cache (ie re-use a
cached MIL tree).
* otherwise generate MIL yourself, interpret it (and cache it). Returns
error string (NULL if ok).
@@ -4664,6 +4689,8 @@
static char*
xquery_function_call(xquery_client *ctx,
lng usec,
+ char *qid,
+ lng timeout,
char *ns,
char *method,
lng argc,
@@ -4717,11 +4744,16 @@
if (fun->mil == NULL) {
MT_set_lock(pf_cache_lock, "xquery_function_call");
if (fun->mil == NULL) {
- /* create working set */
int ret;
-
- src = (char*) PFstartMIL(fun->sig->update);
- while(*src && cur < end) *cur++ = *src++;
+
+ /* create working set */
+ if(qid && *qid) { /* multi-request XRPC transaction */
+ ret = PFstartMIL_XRPCTrans(cur, XQUERY_BUFSIZE-(cur-mil),
fun->sig->update, qid, timeout);
+ if(ret > 0) cur += ret;
+ } else {
+ src = (char*) PFstartMIL(fun->sig->update);
+ while(*src && cur < end) *cur++ = *src++;
+ }
if (shredBAT) {
/* add shredded RPC request message to the working set */
@@ -4737,7 +4769,11 @@
if (ret > 0) cur += ret;
/* destroy working set */
- ret = snprintf(cur, XQUERY_BUFSIZE-(cur-mil),
PFstopMIL(fun->sig->update));
+ if(qid && *qid){ /* multi-request XRPC transaction */
+ ret = PFstopMIL_XRPCTrans(cur, XQUERY_BUFSIZE-(cur-mil),
fun->sig->update, qid);
+ } else {
+ ret = snprintf(cur, XQUERY_BUFSIZE-(cur-mil),
PFstopMIL(fun->sig->update));
+ }
mil[XQUERY_BUFSIZE - 1] = 0;
if (ret > 0) cur += ret;
@@ -4848,7 +4884,7 @@
stream_write(ctx->fderr, fun->mil, strlen(fun->mil), 1);
}
- /* set the MIL shredBAT and getType variables to the actual values */
+ /* set the MIL shredBAT and genType variables to the actual values */
ctx->shredBAT[0] = shredBAT?shredBAT->batCacheid:0;
*(ctx->time_compile) = GDKusec() - usec;
@@ -5731,7 +5767,7 @@
lng* cnt_ptr = cnt;
char nsbak = *nsend, locbak = *locend;
*nsend = 0; *locend = 0;
- err = xquery_function_call(ctx, usec, ns, loc, argc, 1,
&cnt_ptr, tpe, param, NULL);
+ err = xquery_function_call(ctx, usec, NULL, 0, ns, loc,
argc, 1, &cnt_ptr, tpe, param, NULL);
*nsend = nsbak; *locend = locbak;
}
}
@@ -6065,6 +6101,7 @@
/* xquery_method : execute a loop-lifted xquery function
*
+ * flags = 1: timing, 2: serialization mode, 4: debug
* argc = #params
* itercnt = #iterations
* argcnt[iter] = #items per param
@@ -6079,6 +6116,8 @@
char*
xquery_method(mapi_client *mc,
int flags,
+ char* qid,
+ lng timeout,
char* module,
char* uri,
char* method,
@@ -6106,8 +6145,7 @@
err = xquery_change_genType(ctx, mode);
if (err) return err;
- if (argc >= 1000) {
- /* hack: pass argc+1000 and you get debug output */
+ if (flags&4) { /* write debug output */
s = open_wastream("/tmp/xrpc.mil");
char *prologue = (char*) PFinitMIL();
if (s) {
@@ -6115,14 +6153,13 @@
ctx->mode |= XQ_DEBUG;
}
stream_write(ctx->fderr, prologue, strlen(prologue), 1);
- argc = argc % 1000;
}
if (err == NULL && module){
err = xquery_module_load(ctx, ns="xrpc", module, uri);
}
if (err == NULL) {
- err = xquery_function_call(ctx, usec, ns, method, argc, itercnt,
argcnt, argtpe, argval, shredBAT);
+ err = xquery_function_call(ctx, usec, qid, timeout, ns, method, argc,
itercnt, argcnt, argtpe, argval, shredBAT);
if (err == (char*) -1) err = "xquery_method: function could not be
resolved.\n";
else if (err == xquery_function_error) err =
xquery_nondescriptive_error;
}
U xrpc_common.mx
Index: xrpc_common.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_common.mx,v
retrieving revision 1.1.8.1
retrieving revision 1.1.8.2
diff -u -d -r1.1.8.1 -r1.1.8.2
--- xrpc_common.mx 25 May 2008 22:44:07 -0000 1.1.8.1
+++ xrpc_common.mx 25 May 2008 22:55:11 -0000 1.1.8.2
@@ -26,6 +26,14 @@
@a Ying Zhang
@t Includes header files, definitions shared by the XRPC server and XRPC client
[EMAIL PROTECTED]
+.MODULE xrpc_common;
+
+.COMMAND my_hostname() : str = CMDmy_hostname;
+"Returns the hostname of the localhost."
+
+.END xrpc_common;
+
@h
#ifndef XRPC_COMMON_H
#define XRPC_COMMON_H
@@ -46,14 +54,20 @@
#include <sys/select.h>
#include <sys/types.h> /* used by socket */
#include <sys/socket.h>
- #include <unistd.h>
+ #include <unistd.h> /* gethostname() */
#include <netinet/in.h> /* hton and ntoh */
#include <arpa/inet.h> /* dotted IP addr to and from 32-bits int */
- #include <netdb.h> /* convert domain names into IP addr */
+ #include <netdb.h> /* gethostbyname(), h_errno */
#include <errno.h>
#include <ctype.h>
#endif
+#define ISOLATION_NONE 1
+#define ISOLATION_REPEATABLE 2
+#define ISOLATION_2PC 3
+
+#define XRPCLOCK_IDX 6
+
#define XRPC_REQ_CALLBACK "/xrpc"
#define MXQ_ADMIN "http://monetdb.cwi.nl/XQuery/admin/"
@@ -79,6 +93,7 @@
" xrpc:arity=\"%lld\"" \
" xrpc:iter-count=\"%lld\"" \
" xrpc:updCall=\"%s\">"
+#define XRPC_QID "<xrpc:queryID xrpc:host=\"%s\" xrpc:timestamp=\"%s\"
xrpc:timeout=\"%s\"/>"
#define XRPC_HTTP_CALL "<xrpc:call>" \
"<xrpc:sequence>" \
@@ -103,5 +118,178 @@
"</env:Body>" \
"</env:Envelope>\n"
+/* exports for xrpc_server.mx and serialize_dflt.mx */
+int check_timeout_by_idx(oid qid_idx);
+int check_timeout_by_qid(char *qid, lng timestamp, lng timeout);
+
#endif /* XRPC_COMMON_H */
[EMAIL PROTECTED]
+#include "pf_config.h"
+#include "xrpc_common.h"
+#include "pf_support.h"
+
+int CMDmy_hostname(char **res)
+{
+ int ret = 0, len = HOST_NAME_MAX > 255 ? HOST_NAME_MAX : 255;
+ char err[1024];
+
+ char *hname = GDKmalloc(len);
+ if(!hname) {
+ GDKerror("CMDmy_hostname: failed to malloc 'hname'\n");
+ return GDK_FAIL;
+ }
+
+ errno = 0;
+ ret = gethostname(hname, len);
+ if(ret < 0) {
+ snprintf(err, 1024, "CMDmy_hostname: gethostname() failed: %s.\n",
strerror(errno));
+ return GDK_FAIL;
+ }
+
+ *res = hname;
+ return GDK_SUCCEED;
+}
+
+/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
+int
+check_timeout_by_qid(char *qid, lng timestamp, lng timeout)
+{
+ MT_Lock *xrpclock = NULL;
+ int xrpclock_idx = XRPCLOCK_IDX;
+ BAT *xrpc_qids = NULL, *xrpc_statuss = NULL;
+ BATiter qidsi;
+ oid qid_idx = oid_nil;
+ BUN bun_qid = BUN_NONE;
+
+ if((timestamp + (timeout * 1000)) > GDKusec())
+ return 0; /* not timed out */
+
+ if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL) {
+ GDKerror("check_timeout_by_qid: "
+ "failed to get the global lock 'xrpclock'!\n");
+ return -2;
+ }
+ MT_lock_set(xrpclock, "check_timeout_by_qid");
+
+ xrpc_qids = BATdescriptor(BBPindex("xrpc_qids"));
+ xrpc_statuss = BATdescriptor(BBPindex("xrpc_statuss"));
+ if(!xrpc_qids || !xrpc_statuss) {
+ if(xrpc_qids) BBPunfix(BBPcacheid(xrpc_qids));
+ if(xrpc_statuss) BBPunfix(BBPcacheid(xrpc_statuss));
+ MT_lock_unset(xrpclock, "check_timeout_by_qid");
+ GDKerror("check_timeout_by_qid: "
+ "could not find the xrpc_* meta-BATs!\n");
+ return -2;
+ }
+
+ if((bun_qid = BUNfnd(BATmirror(xrpc_qids), qid)) == BUN_NONE) {
+ BBPunfix(BBPcacheid(xrpc_qids));
+ BBPunfix(BBPcacheid(xrpc_statuss));
+ MT_lock_unset(xrpclock, "check_timeout_by_qid");
+ GDKerror("check_timeout_by_qid: "
+ "could not find QID \"%s\" in the meta-BAT "
+ "\"xrpc_qids\"\n", qid);
+ return -1;
+ }
+
+ qidsi = bat_iterator(xrpc_qids);
+ qid_idx = *(oid*)BUNtail(qidsi, bun_qid);
+ if(!BUNreplace(xrpc_statuss, &qid_idx, "timedout", FALSE)){
+ BBPunfix(BBPcacheid(xrpc_qids));
+ BBPunfix(BBPcacheid(xrpc_statuss));
+ MT_lock_unset(xrpclock, "check_timeout_by_qid");
+ GDKerror("check_timeout_by_qid: "
+ "failed to change the status of query \"%s\" into "
+ "\"timedout\"\n", qid);
+ return -2;
+ }
+
+ BBPunfix(BBPcacheid(xrpc_qids));
+ BBPunfix(BBPcacheid(xrpc_statuss));
+ MT_lock_unset(xrpclock, "check_timeout_by_qid");
+ return 1; /* timed out */
+}
+
+/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
+int
+check_timeout_by_idx(oid qid_idx)
+{
+ MT_Lock *xrpclock = NULL;
+ int xrpclock_idx = XRPCLOCK_IDX, ret = -2;
+ BAT *xrpc_qids = NULL, *xrpc_timeouts = NULL, *xrpc_statuss = NULL;
+ BATiter qidsi, timeoutsi;
+ BUN bun_qid = BUN_NONE, bun_to = BUN_NONE;
+ char *qid = NULL, *ts_str = NULL, *to_str = NULL;
+ lng ts = 0, to = 0;
+ char err[1024];
+
+ if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL) {
+ GDKerror("check_timeout_by_idx: "
+ "failed to get the global lock 'xrpclock'!\n");
+ return ret;
+ }
+
+ MT_lock_set(xrpclock, "check_timeout_by_idx");
+
+ xrpc_qids = BATdescriptor(BBPindex("xrpc_qids"));
+ xrpc_timeouts = BATdescriptor(BBPindex("xrpc_timeouts"));
+ xrpc_statuss = BATdescriptor(BBPindex("xrpc_statuss"));
+ if(!xrpc_qids || !xrpc_timeouts || !xrpc_statuss) {
+ snprintf(err, 1024, "check_timeout_by_idx: failed to retrieve "
+ "the xrpc_* meta-BATs!\n");
+ goto unlock_ret;
+ }
+
+ qidsi = bat_iterator(xrpc_qids);
+ timeoutsi = bat_iterator(xrpc_timeouts);
+ BUNfndVOID(bun_qid, qidsi, &qid_idx);
+ BUNfndVOID(bun_to, timeoutsi, &qid_idx);
+ if(bun_qid == BUN_NONE || bun_to == BUN_NONE) {
+ snprintf(err, 1024, "check_timeout_by_idx: "OIDFMT"-th BUN not "
+ "found in the xrpc_* meta-BATs!\n", qid_idx);
+
+ ret = -1;
+ goto unlock_ret;
+ }
+
+ qid = (char*)BUNtail(qidsi, bun_qid);
+ to_str = (char*)BUNtail(timeoutsi, bun_to); assert(qid && to_str);
+ ts_str = strrchr(qid, '|'); assert(ts_str);
+ errno = 0; ts = strtoll(ts_str, NULL, 10);
+ if(errno) {
+ snprintf(err, 1024, "check_timeout_by_idx: invalid value of "
+ "timestamp: \"%s\"!\n", ts_str);
+ goto unlock_ret;
+ }
+ errno = 0; to = strtoll(to_str, NULL, 10);
+ if(errno) {
+ snprintf(err, 1024, "check_timeout_by_idx: invalid value of "
+ "timeout: \"%s\"!\n", to_str);
+ goto unlock_ret;
+ }
+
+ if((ts + (to * 1000)) > GDKusec()) {
+ ret = 0; /* not timed out */
+ goto unlock_ret;
+ }
+
+ if(!BUNreplace(xrpc_statuss, &qid_idx, "timedout", FALSE)){
+ snprintf(err, 1024, "check_timeout_by_idx: failed to change "
+ "the status of query \"%s\" into "
+ "\"timedout\"\n", qid);
+ goto unlock_ret;
+ }
+
+ ret = 1; /* timed out */
+ goto unlock_ret;
+
+unlock_ret:
+ if(xrpc_qids) BBPunfix(BBPcacheid(xrpc_qids));
+ if(xrpc_timeouts) BBPunfix(BBPcacheid(xrpc_timeouts));
+ if(xrpc_statuss) BBPunfix(BBPcacheid(xrpc_statuss));
+ MT_lock_unset(xrpclock, "check_timeout_by_idx");
+ if(ret < 0)
+ GDKerror(err);
+ return ret;
+}
U pf_support.mx
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.299
retrieving revision 1.299.4.1
diff -u -d -r1.299 -r1.299.4.1
--- pf_support.mx 22 May 2008 12:19:28 -0000 1.299
+++ pf_support.mx 25 May 2008 22:55:08 -0000 1.299.4.1
@@ -8111,7 +8111,7 @@
@c
#define PF_CONVOY 5
BAT* ws_overlaps_ws = NULL;
-MT_Lock pf_runtime_lock[6];
+MT_Lock pf_runtime_lock[7];
MT_Sema pf_runtime_sema[3];
int pf_nreaders, pf_convoy, pf_ndocmgt;
lng pf_writer, pf_special;
@@ -8122,6 +8122,7 @@
#define PF_EXTEND_LOCK pf_runtime_lock[3]
#define PF_META_LOCK pf_runtime_lock[4]
#define PF_UPDATE_LOCK pf_runtime_lock[5]
+#define PF_XRPC_LOCK pf_runtime_lock[6]
#define PF_META_BARRIER pf_runtime_sema[0]
#define PF_UPDATE_BARRIER pf_runtime_sema[1]
#define PF_EXTEND_BARRIER pf_runtime_sema[2]
@@ -8133,8 +8134,8 @@
/* get handle to the shared locks */
int CMDpflock_get(ptr *ret, int* nr) {
- *ret = (ptr) &pf_runtime_lock[(*nr)&3];
- if (*nr > 3) *ret = (ptr) &PF_EXTEND_BARRIER; /* hack: do not feel like
creating a CMDsema_get */
+ *ret = (ptr) &pf_runtime_lock[(*nr)&6];
+ if (*nr > 6) *ret = (ptr) &PF_EXTEND_BARRIER; /* hack: do not feel like
creating a CMDsema_get */
return GDK_SUCCEED;
}
U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.4.1
retrieving revision 1.45.4.2
diff -u -d -r1.45.4.1 -r1.45.4.2
--- xrpc_client.mx 25 May 2008 22:44:05 -0000 1.45.4.1
+++ xrpc_client.mx 25 May 2008 22:55:11 -0000 1.45.4.2
@@ -40,14 +40,19 @@
@- HTTP client function(s)
@m
+
+.USE lock;
+
.COMMAND http_post(
str options,
+ str qid,
+ lng timeout,
[...1311 lines suppressed...]
+ GDKerror("do_simple_query: failed to create socket_rastream\n");
return GDK_FAIL;
+ }
+ shredBAT = response2bat(timing, in, dst, port, *updCall);
+
+ stream_close(out); stream_destroy(out);
+ stream_close(in); stream_destroy(in);
+ buffer_destroy(req);
+
+ if(!shredBAT) return GDK_FAIL;
+
+ if(qid && *qid) {
+ if(add_xrpc_hosts(shredBAT, ws) == GDK_FAIL) {
+ BBPreclaim(shredBAT);
+ return GDK_FAIL;
+ }
+ }
*res = shredBAT;
return GDK_SUCCEED;
U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46
retrieving revision 1.46.4.1
diff -u -d -r1.46 -r1.46.4.1
--- serialize_dflt.mx 7 Apr 2008 12:54:13 -0000 1.46
+++ serialize_dflt.mx 25 May 2008 22:55:09 -0000 1.46.4.1
@@ -40,7 +40,7 @@
/* contains dummy callback functions */
#include "serialize_null.h"
#include "pathfinder.h"
-#include "xrpc_common.h"
+#include "xrpc_common.h" /* *_NS defs, check_timeout_by_idx() */
/* a lot of characters, static strings, and their sizes
(the idea is to reuse constant character pointers during
@@ -996,23 +996,147 @@
return SUCCESS;
}
+static bool
+xrpc_serializeQID(XqueryCtx *ctx, char *qid_idx)
+{
+ MT_Lock *xrpclock = NULL;
+ int xrpclock_idx = XRPCLOCK_IDX, ret = 0;
+ BAT *xrpc_qids = NULL, *xrpc_timeouts = NULL;
+ BATiter qidi, toi;
+ char *qid = NULL, *ts = NULL;
+ lng timeout = 0;
+ BUN bun_qid = BUN_NONE, bun_to = BUN_NONE;
+ oid idx = oid_nil;
+
+ if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL)
+ return PROBLEM;
+
+ MT_lock_set(xrpclock, "xrpc_serializeQID");
+ if((xrpc_qids = BATdescriptor(BBPindex("xrpc_qids")))) {
+ if((xrpc_timeouts = BATdescriptor(BBPindex("xrpc_timeouts")))) {
+ errno = 0;
+ ret = strtol(qid_idx, NULL, 10);
+ if(errno) {
+ GDKerror(strerror(errno));
+ goto err_ret;
+ }
+
+ idx = (oid) ret;
+ qidi = bat_iterator(xrpc_qids);
+ toi = bat_iterator(xrpc_timeouts);
+ BUNfndVOID(bun_qid, qidi, &idx);
+ BUNfndVOID(bun_to, toi, &idx);
+ if(bun_qid == BUN_NONE || bun_to == BUN_NONE)
+ goto err_ret;
+
+ qid = (char*)BUNtail(qidi, bun_qid);
+ timeout = *(lng*)BUNtail(toi, bun_to);
+ ts = strrchr(qid, '|'); assert(ts);
+ ts[0] = '\0'; ts++; /* separate 'timestamp' from 'host:port' */
+
+ ret = stream_printf(ctx->out,
+ "<xrpc:queryID host=\"%s\" timestamp=\"%s\"
timeout=\""LLFMT"\"/>",
+ qid, ts, timeout);
+
+ ts--; ts[0] = '|';
+
+ if(ret < 0) goto err_ret;
+ BBPunfix(BBPcacheid(xrpc_qids));
+ BBPunfix(BBPcacheid(xrpc_timeouts));
+ MT_lock_unset(xrpclock, "xrpc_serializeQID");
+ return SUCCESS;
+
+ }
+ BBPunfix(BBPcacheid(xrpc_qids));
+ }
+ MT_lock_unset(xrpclock, "xrpc_serializeQID");
+ return PROBLEM;
+
+err_ret:
+ BBPunfix(BBPcacheid(xrpc_qids));
+ BBPunfix(BBPcacheid(xrpc_timeouts));
+ MT_lock_unset(xrpclock, "xrpc_serializeQID");
+ return PROBLEM;
+}
+
+static bool
+xrpc_serializeSuccessors(XqueryCtx *ctx, BAT *sucsBAT)
+{
+ oid i, cnt = BATcount(sucsBAT);
+ char *host = NULL;
+ BATiter sucsBATi = bat_iterator(sucsBAT);
+
+ if(stream_write(ctx->out, "<xrpc:participants>", 1, 19) != 19)
+ return PROBLEM;
+ for(i = 0; i < cnt; i++){
+ host = (char *) BUNtail(sucsBATi, i);
+ stream_printf(ctx->out, "<xrpc:participant>%s</xrpc:participant>",
host);
+ }
+ if(stream_write(ctx->out, "</xrpc:participants>", 1, 20) != 20)
+ return PROBLEM;
+
+ return SUCCESS;
+}
+
/**
* Start serialization of XRPC response message
*/
static bool
xrpc_startSerialize(XqueryCtx *ctx)
{
- stream_write(ctx->out, "HTTP/1.1 200 OK\r\n"
+ int ret = 0;
+ BUN bun = BUN_NONE;
+ BAT *sucsBAT = NULL;
+ BATiter wsi;
+ oid batID = XRPC_SUCCESSOR, idx = oid_nil;
+ char *name = NULL;
+
+ /* Check if this XRPC query requires isolation. If yes, we need to
+ * serialize 'queryID' and 'participants'. */
+ wsi = bat_iterator(ctx->ws);
+ BUNfndOID(bun, wsi, (ptr)&batID);
+ if(bun == BUN_NONE) return PROBLEM;
+ if(!(sucsBAT = BATdescriptor(*(bat*)Tloc(ctx->ws,bun))))
+ return PROBLEM;
+ if(!(name = BBPname(BBPcacheid(sucsBAT)))) {
+ BBPunfix(BBPcacheid(sucsBAT));
+ return PROBLEM;
+ }
+
+ /* check if the query has timed out or not, before printing results */
+ if(strncmp(name, "tmp_", 4) != 0) {
+ errno = 0;
+ idx = (oid) strtol(name, NULL, 10);
+ if(errno) {
+ GDKerror("xrpc_startSerialize: failed to convert the name "
+ "of the BAT ws[XRPC_SUCCESS] to an oid: %s.\n",
+ strerror(errno));
+ return PROBLEM;
+ }
+
+ ret = check_timeout_by_idx(idx);
+ if(ret < 0) {
+ return PROBLEM;
+ } else if(ret == 1) {
+ GDKerror("xrpc_startSerialize: xrpc-query timed out\n");
+ return PROBLEM;
+ }
+ }
+
+ ret = stream_write(ctx->out,
+ "HTTP/1.1 200 OK\r\n"
"Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1, 60);
+ if(ret != 60) return PROBLEM;
/* We need to prepend "=" to each line, so make sure
that we print one after each newline (encoded in
newline string 'newline')->
In addition we print one '=' at the beginning-> */
if (ctx->modes & MODE_MAPI)
- stream_write (ctx->out, &e_, 1, 1);
+ if((ret = stream_write (ctx->out, &e_, 1, 1)) != 1)
+ return PROBLEM;
- stream_printf(ctx->out,
+ ret = stream_printf(ctx->out,
"<?xml version=\"1.0\" encoding=\"utf-8\"?>%s"
"<env:Envelope "
"xmlns:env=\"%s\" "
@@ -1029,7 +1153,20 @@
XSI_NS,
XRPC_NS, XRPC_LOC,
dflt_ws->module, dflt_ws->method);
+ if(ret < 0) return PROBLEM;
+
+ if(strncmp("tmp_", name, 4) != 0) {
+ /* This is a 2PC-XRPC query! The name of the ws[XRPC_SUCCESS] BAT
+ * encodes where to find the query meta-data in the xrpc_* BATs. */
+ if(xrpc_serializeQID(ctx, name) == PROBLEM ||
+ xrpc_serializeSuccessors(ctx, sucsBAT) == PROBLEM) {
+ BBPunfix(BBPcacheid(sucsBAT));
+ return PROBLEM;
+ }
+ }
+
+ BBPunfix(BBPcacheid(sucsBAT));
return SUCCESS;
}
U shredder.mx
Index: shredder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shredder.mx,v
retrieving revision 1.136.4.1
retrieving revision 1.136.4.2
diff -u -d -r1.136.4.1 -r1.136.4.2
--- shredder.mx 25 May 2008 22:44:05 -0000 1.136.4.1
+++ shredder.mx 25 May 2008 22:55:10 -0000 1.136.4.2
@@ -1560,7 +1560,7 @@
if (shredCtx == NULL) return NULL;
/* fill it */
- int i = 4;
+ int i = 9;
memset(shredCtx, 0, sizeof(shredCtxStruct));
shredCtx->location = location;
shredCtx->fileSize = fileSize;
U Makefile.ag
Index: Makefile.ag
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/Makefile.ag,v
retrieving revision 1.86
retrieving revision 1.86.4.1
diff -u -d -r1.86 -r1.86.4.1
--- Makefile.ag 20 May 2008 10:49:07 -0000 1.86
+++ Makefile.ag 25 May 2008 22:55:07 -0000 1.86.4.1
@@ -55,7 +55,7 @@
pf_support.mx \
shredder.mx
LIBS = \
- libserialize libpf $(PF_LIBS) \
+ -l_xrpc_common libserialize libpf $(PF_LIBS) \
$(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS) -lmonet
$(PTHREAD_LIBS) \
$(MONETDB4_MODS) -l_lock -l_monettime -l_streams -l_builtin
-l_ascii_io -l_algebra -l_constant
}
@@ -98,11 +98,17 @@
$(MONETDB4_MODS) -l_logger -l_streams -l_builtin -l_ascii_io
-l_algebra -l_sys -l_constant -l_mapi
}
+lib__xrpc_common = {
+ DIR = libdir/MonetDB4
+ SOURCES = xrpc_common.mx
+ LIBS = \
+ $(SOCKET_LIBS) \
+ $(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS)
+}
+
lib__xrpc_client = {
DIR = libdir/MonetDB4
- SOURCES = \
- xrpc_common.mx \
- xrpc_client.mx
+ SOURCES = xrpc_client.mx
LIBS = \
libserialize $(PF_LIBS) $(SOCKET_LIBS)
./lib_pf_support ./lib_pathfinder \
$(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS)
-lmonet \
@@ -113,7 +119,6 @@
DIR = libdir/MonetDB4
SOURCES = \
shttpd.c shttpd.h \
- xrpc_common.mx \
xrpc_server.mx
LIBS = \
../compiler/libcompiler1 \
@@ -140,5 +145,5 @@
headers_mil = {
HEADERS = mil
DIR = libdir/MonetDB4
- SOURCES = pathfinder.mx pf_support.mx pf_standoff.mx xrpc_client.mx
xrpc_server.mx
+ SOURCES = pathfinder.mx pf_support.mx pf_standoff.mx xrpc_client.mx
xrpc_server.mx xrpc_common.mx
}
U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.4.1
retrieving revision 1.68.4.2
diff -u -d -r1.68.4.1 -r1.68.4.2
--- xrpc_server.mx 25 May 2008 22:44:07 -0000 1.68.4.1
+++ xrpc_server.mx 25 May 2008 22:55:11 -0000 1.68.4.2
@@ -116,8 +116,8 @@
}
ADDHELP("get_xrpc_options", "zhang", "March 2007",
"DESCRIPTION:\n\
-Find the options that should be pass to the XRPC server, currently only\
-\"timing\"",
+Find the options that should be pass to the XRPC server, currently \
+\"timing\" or \"debug\"",
"xrpc_server");
PROC rpcd_start() : void {
[...1240 lines suppressed...]
+ debug = 4;
+ }
/* Register call back function, for XRPC (admin) requests, and XML
(get/put/delete) file handling */
shttpd_register_url(XRPC_REQ_CALLBACK, xrpc_fork_mapiclient, NULL);
@@ -1141,6 +1360,14 @@
ctx = shttpd_open_port(xrpc_port, *open);
listen_socket = ctx.sock;
+ /* Check for expired queries every 5 seconds */
+ sigfillset(&set); /* include all signals in 'set' */
+ act.sa_handler = check_timeout;
+ act.sa_mask = set; /* block all other signals */
+ act.sa_flags = 0;
+ sigaction(SIGALRM, &act, NULL);
+ alarm(50000000);
+
/* Serve connections infinitely until someone kills us */
for ( ; rpcd_running; ) shttpd_poll(&ctx, 200);
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins