Update of /cvsroot/monetdb/pathfinder/runtime
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv6789/runtime
Modified Files:
Tag: XQuery_0-24
Makefile.ag pathfinder.mx pf_support.mx serialize_dflt.mx
shredder.mx xrpc_client.mx xrpc_common.mx xrpc_server.mx
Log Message:
Undo of previous check-in that went to the wrong branch: I created a new
branch for the ongoing implementation, based on the new release branch,
so I thought my check-in will go the the new created branch.
U pathfinder.mx
Index: pathfinder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pathfinder.mx,v
retrieving revision 1.416.2.2
retrieving revision 1.416.2.3
diff -u -d -r1.416.2.2 -r1.416.2.3
--- pathfinder.mx 25 May 2008 21:29:23 -0000 1.416.2.2
+++ pathfinder.mx 25 May 2008 21:49:50 -0000 1.416.2.3
@@ -94,7 +94,6 @@
module(pf_support);
module(logger);
module(mkey);
-module(xrpc_common);
module(xrpc_server);
module(xrpc_client);
module(pf_standoff);
@@ -341,30 +340,29 @@
@:[EMAIL PROTECTED](CONT_NAME, 41, void, str, void, void, oid_nil)@
@:[EMAIL PROTECTED](CONT_RUNTIME, 42, void, bat, void, void, oid_nil)@
@:[EMAIL PROTECTED](CONT_LOCKED, 43, void, lock,void, void, oid_nil)@
-@:[EMAIL PROTECTED](XRPC_SUCCESSOR, 44, void, str, void, void, oid_nil)@
-@:[EMAIL PROTECTED](_MAP_PID, 45, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_SIZE, 46, void, bat, void, int, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_LEVEL, 47, void, bat, void, chr, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_PROP, 48, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_KIND, 49, void, bat, void, chr, PRE_BASE)@
-@:[EMAIL PROTECTED](_RID_NID, 50, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_NID_RID, 51, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_FRAG_ROOT, 52, void, bat, oid, oid, oid_nil)@
-@:[EMAIL PROTECTED](_QN_HISTOGRAM, 53, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_PREFIX_URI_LOC,54, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_URI_LOC, 55, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_PREFIX, 56, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_URI, 57, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_QN_LOC, 58, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_TEXT, 59, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_COM, 60, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_INS, 61, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_TGT, 62, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_PROP_VAL, 63, void, bat, void, str, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_OWN, 64, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_QN, 65, void, bat, void, oid, PRE_BASE)@
-@:[EMAIL PROTECTED](_ATTR_PROP, 66, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_MAP_PID, 44, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_SIZE, 45, void, bat, void, int, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_LEVEL, 46, void, bat, void, chr, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_PROP, 47, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_KIND, 48, void, bat, void, chr, PRE_BASE)@
+@:[EMAIL PROTECTED](_RID_NID, 49, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_NID_RID, 50, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_FRAG_ROOT, 51, void, bat, oid, oid, oid_nil)@
+@:[EMAIL PROTECTED](_QN_HISTOGRAM, 52, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_PREFIX_URI_LOC,53, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_URI_LOC, 54, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_PREFIX, 55, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_URI, 56, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_QN_LOC, 57, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_TEXT, 58, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_COM, 59, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_INS, 60, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_TGT, 61, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_PROP_VAL, 62, void, bat, void, str, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_OWN, 63, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_QN, 64, void, bat, void, oid, PRE_BASE)@
+@:[EMAIL PROTECTED](_ATTR_PROP, 65, void, bat, void, oid, PRE_BASE)@
@-
The bottom segment are the master copies of the top segment.
The middle segment is private to the query.
@@ -374,7 +372,7 @@
BEWARE: the logger version number below needs to be incremented whenever the
working set schema is changed.
@= ws_decl
-@:[EMAIL PROTECTED](WS_SIZE, 67)@
+@:[EMAIL PROTECTED](WS_SIZE, 66)@
@:[EMAIL PROTECTED](QNAME, 2)@
@:[EMAIL PROTECTED](BOOL, 3)@
@:[EMAIL PROTECTED](INT, 4)@
@@ -396,7 +394,7 @@
@:ws_decl(mil)@
# logger version number, needs to be incremented whenever the working set
schema is changed
-const LOGGER_VERSION := 2;
+const LOGGER_VERSION := 1;
# KIND constants, carefully chosen
# atomic value items can be retrieved with 'kind.select(int_nil,ATOMIC)'
@@ -570,7 +568,7 @@
# there is just a global lock for this.
var pf_extend := pflock_get(3); # protects the counter
var pf_extend_cnt := 0; # counter is used by nonexclusive acces (first gets
barrier, last releases)
-var pf_extend_barrier := sema(ptr(pflock_get(9))); # the barrier
+var pf_extend_barrier := sema(ptr(pflock_get(4))); # the barrier
# master bat *UPDATES* must be protected against checkpoints (global BBP
subcommits + log restart)
# master updates take a non-exclusive pf_chkpt lock; pf_checkpoint() takes an
exclusive access
@@ -635,11 +633,6 @@
commitBAT.append("doc_collection");
commitBAT.append("doc_timestamp");
commitBAT.append("uri_lifetime");
- commitBAT.append("xrpc_qids");
- commitBAT.append("xrpc_wsids");
- commitBAT.append("xrpc_timeouts");
- commitBAT.append("xrpc_locks");
- commitBAT.append("xrpc_statuss");
var ok := false;
lock_set(pf_wal);
var err := CATCH(ok := subcommit(commitBAT));
@@ -717,14 +710,6 @@
var uri_lifetime; # caching rules
-# meta-data BATs for multi-request XRPC transactions
-var xrpc_qids; # bat[void,str] query-id (host|timestamp)
-var xrpc_timeouts; # bat[void,lng] query timeout in msec
-var xrpc_wsids; # bat[void,lng] ID of the workingset associated with
this query
-var xrpc_locks; # bat[void,lock] lock for the workingset of this query
-var xrpc_statuss; # bat[void,str] 2PC status of this query: "exec",
"prepare", "commit" or "abort"
-var xrpclock := pflock_get(6); # master XRPC lock, to protect above xrpc_* BATs
-
# initialize persistent bats (use BBP as global mechanism to discover
initialization)
if (isnil(CATCH(bat("doc_name").count()))) {
collection_name := bat("collection_name");
@@ -734,11 +719,6 @@
doc_location := bat("doc_location");
doc_timestamp := bat("doc_timestamp");
uri_lifetime := bat("uri_lifetime");
- xrpc_qids := bat("xrpc_qids");
- xrpc_timeouts := bat("xrpc_timeouts");
- xrpc_wsids := bat("xrpc_wsids");
- xrpc_locks := bat("xrpc_locks");
- xrpc_statuss := bat("xrpc_statuss");
# check that this database was created with a compatible pagesize as
requested now
var pagebits := 16;
@@ -754,11 +734,6 @@
doc_location := new(oid,str).persists(true).rename("doc_location");
doc_timestamp :=
new(oid,timestamp).persists(true).rename("doc_timestamp");
uri_lifetime := new(str,lng).insert("tmp",
1LL).persists(true).rename("uri_lifetime");
- xrpc_qids := new(void,str).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_qids");
- xrpc_timeouts := new(void,lng).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_timeouts");
- xrpc_wsids := new(void,lng).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_wsids");
- xrpc_locks := new(void,lock).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_locks");
- xrpc_statuss := new(void,str).seqbase([EMAIL
PROTECTED]).persists(true).rename("xrpc_statuss");
doc_collection.insert(DOCID_CURID_HACK, DOCID_MIN); # hack: store next oid
as in invalid tuple
doc_collection.insert(DOCID_PGBIT_HACK, oid(REMAP_PAGE_BITS)); # hack:
store pagesize also
pf_checkpoint(bat(void,str).append("uri_lifetime"));
@@ -4041,7 +4016,7 @@
/* exports for XRPC */
pathfinder_export void xquery_client_engine(mapi_client*);
-pathfinder_export char* xquery_method(mapi_client*, int, char*, lng, char*,
char*, char*, lng, lng, lng**, str*, str*, BAT*);
+pathfinder_export char* xquery_method(mapi_client*, int, char*, char*, char*,
lng, lng, lng**, str*, str*, BAT*);
pathfinder_export void xquery_client_end(mapi_client *, char *);
#endif
@@ -4519,7 +4494,7 @@
* - resolve a method call in the current xquery context (return NULL if not
resolved)
*
* char*
- * xquery_function_call(xquery_client *ctx, lng usec, char* qid, lng timeout,
char *ns, char *method,
+ * xquery_function_call(xquery_client *ctx, lng usec, char *ns, char *method,
* int argc, int itercnt, int** argcnt, char** argtpe,
char** argval, BAT *shredBAT)
* - call a function ns:method(). try to use the function cache (ie re-use a
cached MIL tree).
* otherwise generate MIL yourself, interpret it (and cache it). Returns
error string (NULL if ok).
@@ -4689,8 +4664,6 @@
static char*
xquery_function_call(xquery_client *ctx,
lng usec,
- char *qid,
- lng timeout,
char *ns,
char *method,
lng argc,
@@ -4744,16 +4717,11 @@
if (fun->mil == NULL) {
MT_set_lock(pf_cache_lock, "xquery_function_call");
if (fun->mil == NULL) {
- int ret;
-
/* create working set */
- if(qid && *qid) { /* multi-request XRPC transaction */
- ret = PFstartMIL_XRPCTrans(cur, XQUERY_BUFSIZE-(cur-mil),
fun->sig->update, qid, timeout);
- if(ret > 0) cur += ret;
- } else {
- src = (char*) PFstartMIL(fun->sig->update);
- while(*src && cur < end) *cur++ = *src++;
- }
+ int ret;
+
+ src = (char*) PFstartMIL(fun->sig->update);
+ while(*src && cur < end) *cur++ = *src++;
if (shredBAT) {
/* add shredded RPC request message to the working set */
@@ -4769,11 +4737,7 @@
if (ret > 0) cur += ret;
/* destroy working set */
- if(qid && *qid){ /* multi-request XRPC transaction */
- ret = PFstopMIL_XRPCTrans(cur, XQUERY_BUFSIZE-(cur-mil),
fun->sig->update, qid);
- } else {
- ret = snprintf(cur, XQUERY_BUFSIZE-(cur-mil),
PFstopMIL(fun->sig->update));
- }
+ ret = snprintf(cur, XQUERY_BUFSIZE-(cur-mil),
PFstopMIL(fun->sig->update));
mil[XQUERY_BUFSIZE - 1] = 0;
if (ret > 0) cur += ret;
@@ -4884,7 +4848,7 @@
stream_write(ctx->fderr, fun->mil, strlen(fun->mil), 1);
}
- /* set the MIL shredBAT and genType variables to the actual values */
+ /* set the MIL shredBAT and getType variables to the actual values */
ctx->shredBAT[0] = shredBAT?shredBAT->batCacheid:0;
*(ctx->time_compile) = GDKusec() - usec;
@@ -5767,7 +5731,7 @@
lng* cnt_ptr = cnt;
char nsbak = *nsend, locbak = *locend;
*nsend = 0; *locend = 0;
- err = xquery_function_call(ctx, usec, NULL, 0, ns, loc,
argc, 1, &cnt_ptr, tpe, param, NULL);
+ err = xquery_function_call(ctx, usec, ns, loc, argc, 1,
&cnt_ptr, tpe, param, NULL);
*nsend = nsbak; *locend = locbak;
}
}
@@ -6101,7 +6065,6 @@
/* xquery_method : execute a loop-lifted xquery function
*
- * flags = 1: timing, 2: serialization mode, 4: debug
* argc = #params
* itercnt = #iterations
* argcnt[iter] = #items per param
@@ -6116,8 +6079,6 @@
char*
xquery_method(mapi_client *mc,
int flags,
- char* qid,
- lng timeout,
char* module,
char* uri,
char* method,
@@ -6145,7 +6106,8 @@
err = xquery_change_genType(ctx, mode);
if (err) return err;
- if (flags&4) { /* write debug output */
+ if (argc >= 1000) {
+ /* hack: pass argc+1000 and you get debug output */
s = open_wastream("/tmp/xrpc.mil");
char *prologue = (char*) PFinitMIL();
if (s) {
@@ -6153,13 +6115,14 @@
ctx->mode |= XQ_DEBUG;
}
stream_write(ctx->fderr, prologue, strlen(prologue), 1);
+ argc = argc % 1000;
}
if (err == NULL && module){
err = xquery_module_load(ctx, ns="xrpc", module, uri);
}
if (err == NULL) {
- err = xquery_function_call(ctx, usec, qid, timeout, ns, method, argc,
itercnt, argcnt, argtpe, argval, shredBAT);
+ err = xquery_function_call(ctx, usec, ns, method, argc, itercnt,
argcnt, argtpe, argval, shredBAT);
if (err == (char*) -1) err = "xquery_method: function could not be
resolved.\n";
else if (err == xquery_function_error) err =
xquery_nondescriptive_error;
}
U xrpc_common.mx
Index: xrpc_common.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_common.mx,v
retrieving revision 1.1.6.1
retrieving revision 1.1.6.2
diff -u -d -r1.1.6.1 -r1.1.6.2
--- xrpc_common.mx 25 May 2008 21:29:25 -0000 1.1.6.1
+++ xrpc_common.mx 25 May 2008 21:49:55 -0000 1.1.6.2
@@ -26,14 +26,6 @@
@a Ying Zhang
@t Includes header files, definitions shared by the XRPC server and XRPC client
[EMAIL PROTECTED]
-.MODULE xrpc_common;
-
-.COMMAND my_hostname() : str = CMDmy_hostname;
-"Returns the hostname of the localhost."
-
-.END xrpc_common;
-
@h
#ifndef XRPC_COMMON_H
#define XRPC_COMMON_H
@@ -54,20 +46,14 @@
#include <sys/select.h>
#include <sys/types.h> /* used by socket */
#include <sys/socket.h>
- #include <unistd.h> /* gethostname() */
+ #include <unistd.h>
#include <netinet/in.h> /* hton and ntoh */
#include <arpa/inet.h> /* dotted IP addr to and from 32-bits int */
- #include <netdb.h> /* gethostbyname(), h_errno */
+ #include <netdb.h> /* convert domain names into IP addr */
#include <errno.h>
#include <ctype.h>
#endif
-#define ISOLATION_NONE 1
-#define ISOLATION_REPEATABLE 2
-#define ISOLATION_2PC 3
-
-#define XRPCLOCK_IDX 6
-
#define XRPC_REQ_CALLBACK "/xrpc"
#define MXQ_ADMIN "http://monetdb.cwi.nl/XQuery/admin/"
@@ -93,7 +79,6 @@
" xrpc:arity=\"%lld\"" \
" xrpc:iter-count=\"%lld\"" \
" xrpc:updCall=\"%s\">"
-#define XRPC_QID "<xrpc:queryID xrpc:host=\"%s\" xrpc:timestamp=\"%s\"
xrpc:timeout=\"%s\"/>"
#define XRPC_HTTP_CALL "<xrpc:call>" \
"<xrpc:sequence>" \
@@ -118,178 +103,5 @@
"</env:Body>" \
"</env:Envelope>\n"
-/* exports for xrpc_server.mx and serialize_dflt.mx */
-int check_timeout_by_idx(oid qid_idx);
-int check_timeout_by_qid(char *qid, lng timestamp, lng timeout);
-
#endif /* XRPC_COMMON_H */
[EMAIL PROTECTED]
-#include "pf_config.h"
-#include "xrpc_common.h"
-#include "pf_support.h"
-
-int CMDmy_hostname(char **res)
-{
- int ret = 0, len = HOST_NAME_MAX > 255 ? HOST_NAME_MAX : 255;
- char err[1024];
-
- char *hname = GDKmalloc(len);
- if(!hname) {
- GDKerror("CMDmy_hostname: failed to malloc 'hname'\n");
- return GDK_FAIL;
- }
-
- errno = 0;
- ret = gethostname(hname, len);
- if(ret < 0) {
- snprintf(err, 1024, "CMDmy_hostname: gethostname() failed: %s.\n",
strerror(errno));
- return GDK_FAIL;
- }
-
- *res = hname;
- return GDK_SUCCEED;
-}
-
-/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
-int
-check_timeout_by_qid(char *qid, lng timestamp, lng timeout)
-{
- MT_Lock *xrpclock = NULL;
- int xrpclock_idx = XRPCLOCK_IDX;
- BAT *xrpc_qids = NULL, *xrpc_statuss = NULL;
- BATiter qidsi;
- oid qid_idx = oid_nil;
- BUN bun_qid = BUN_NONE;
-
- if((timestamp + (timeout * 1000)) > GDKusec())
- return 0; /* not timed out */
-
- if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL) {
- GDKerror("check_timeout_by_qid: "
- "failed to get the global lock 'xrpclock'!\n");
- return -2;
- }
- MT_lock_set(xrpclock, "check_timeout_by_qid");
-
- xrpc_qids = BATdescriptor(BBPindex("xrpc_qids"));
- xrpc_statuss = BATdescriptor(BBPindex("xrpc_statuss"));
- if(!xrpc_qids || !xrpc_statuss) {
- if(xrpc_qids) BBPunfix(BBPcacheid(xrpc_qids));
- if(xrpc_statuss) BBPunfix(BBPcacheid(xrpc_statuss));
- MT_lock_unset(xrpclock, "check_timeout_by_qid");
- GDKerror("check_timeout_by_qid: "
- "could not find the xrpc_* meta-BATs!\n");
- return -2;
- }
-
- if((bun_qid = BUNfnd(BATmirror(xrpc_qids), qid)) == BUN_NONE) {
- BBPunfix(BBPcacheid(xrpc_qids));
- BBPunfix(BBPcacheid(xrpc_statuss));
- MT_lock_unset(xrpclock, "check_timeout_by_qid");
- GDKerror("check_timeout_by_qid: "
- "could not find QID \"%s\" in the meta-BAT "
- "\"xrpc_qids\"\n", qid);
- return -1;
- }
-
- qidsi = bat_iterator(xrpc_qids);
- qid_idx = *(oid*)BUNtail(qidsi, bun_qid);
- if(!BUNreplace(xrpc_statuss, &qid_idx, "timedout", FALSE)){
- BBPunfix(BBPcacheid(xrpc_qids));
- BBPunfix(BBPcacheid(xrpc_statuss));
- MT_lock_unset(xrpclock, "check_timeout_by_qid");
- GDKerror("check_timeout_by_qid: "
- "failed to change the status of query \"%s\" into "
- "\"timedout\"\n", qid);
- return -2;
- }
-
- BBPunfix(BBPcacheid(xrpc_qids));
- BBPunfix(BBPcacheid(xrpc_statuss));
- MT_lock_unset(xrpclock, "check_timeout_by_qid");
- return 1; /* timed out */
-}
-
-/* return: 1: timedout; 0: not timedout; -1: not found; -2: other errors */
-int
-check_timeout_by_idx(oid qid_idx)
-{
- MT_Lock *xrpclock = NULL;
- int xrpclock_idx = XRPCLOCK_IDX, ret = -2;
- BAT *xrpc_qids = NULL, *xrpc_timeouts = NULL, *xrpc_statuss = NULL;
- BATiter qidsi, timeoutsi;
- BUN bun_qid = BUN_NONE, bun_to = BUN_NONE;
- char *qid = NULL, *ts_str = NULL, *to_str = NULL;
- lng ts = 0, to = 0;
- char err[1024];
-
- if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL) {
- GDKerror("check_timeout_by_idx: "
- "failed to get the global lock 'xrpclock'!\n");
- return ret;
- }
-
- MT_lock_set(xrpclock, "check_timeout_by_idx");
-
- xrpc_qids = BATdescriptor(BBPindex("xrpc_qids"));
- xrpc_timeouts = BATdescriptor(BBPindex("xrpc_timeouts"));
- xrpc_statuss = BATdescriptor(BBPindex("xrpc_statuss"));
- if(!xrpc_qids || !xrpc_timeouts || !xrpc_statuss) {
- snprintf(err, 1024, "check_timeout_by_idx: failed to retrieve "
- "the xrpc_* meta-BATs!\n");
- goto unlock_ret;
- }
-
- qidsi = bat_iterator(xrpc_qids);
- timeoutsi = bat_iterator(xrpc_timeouts);
- BUNfndVOID(bun_qid, qidsi, &qid_idx);
- BUNfndVOID(bun_to, timeoutsi, &qid_idx);
- if(bun_qid == BUN_NONE || bun_to == BUN_NONE) {
- snprintf(err, 1024, "check_timeout_by_idx: "OIDFMT"-th BUN not "
- "found in the xrpc_* meta-BATs!\n", qid_idx);
-
- ret = -1;
- goto unlock_ret;
- }
-
- qid = (char*)BUNtail(qidsi, bun_qid);
- to_str = (char*)BUNtail(timeoutsi, bun_to); assert(qid && to_str);
- ts_str = strrchr(qid, '|'); assert(ts_str);
- errno = 0; ts = strtoll(ts_str, NULL, 10);
- if(errno) {
- snprintf(err, 1024, "check_timeout_by_idx: invalid value of "
- "timestamp: \"%s\"!\n", ts_str);
- goto unlock_ret;
- }
- errno = 0; to = strtoll(to_str, NULL, 10);
- if(errno) {
- snprintf(err, 1024, "check_timeout_by_idx: invalid value of "
- "timeout: \"%s\"!\n", to_str);
- goto unlock_ret;
- }
-
- if((ts + (to * 1000)) > GDKusec()) {
- ret = 0; /* not timed out */
- goto unlock_ret;
- }
-
- if(!BUNreplace(xrpc_statuss, &qid_idx, "timedout", FALSE)){
- snprintf(err, 1024, "check_timeout_by_idx: failed to change "
- "the status of query \"%s\" into "
- "\"timedout\"\n", qid);
- goto unlock_ret;
- }
-
- ret = 1; /* timed out */
- goto unlock_ret;
-
-unlock_ret:
- if(xrpc_qids) BBPunfix(BBPcacheid(xrpc_qids));
- if(xrpc_timeouts) BBPunfix(BBPcacheid(xrpc_timeouts));
- if(xrpc_statuss) BBPunfix(BBPcacheid(xrpc_statuss));
- MT_lock_unset(xrpclock, "check_timeout_by_idx");
- if(ret < 0)
- GDKerror(err);
- return ret;
-}
U pf_support.mx
Index: pf_support.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/pf_support.mx,v
retrieving revision 1.299.2.1
retrieving revision 1.299.2.2
diff -u -d -r1.299.2.1 -r1.299.2.2
--- pf_support.mx 25 May 2008 21:29:25 -0000 1.299.2.1
+++ pf_support.mx 25 May 2008 21:49:51 -0000 1.299.2.2
@@ -8111,7 +8111,7 @@
@c
#define PF_CONVOY 5
BAT* ws_overlaps_ws = NULL;
-MT_Lock pf_runtime_lock[7];
+MT_Lock pf_runtime_lock[6];
MT_Sema pf_runtime_sema[3];
int pf_nreaders, pf_convoy, pf_ndocmgt;
lng pf_writer, pf_special;
@@ -8122,7 +8122,6 @@
#define PF_EXTEND_LOCK pf_runtime_lock[3]
#define PF_META_LOCK pf_runtime_lock[4]
#define PF_UPDATE_LOCK pf_runtime_lock[5]
-#define PF_XRPC_LOCK pf_runtime_lock[6]
#define PF_META_BARRIER pf_runtime_sema[0]
#define PF_UPDATE_BARRIER pf_runtime_sema[1]
#define PF_EXTEND_BARRIER pf_runtime_sema[2]
@@ -8134,8 +8133,8 @@
/* get handle to the shared locks */
int CMDpflock_get(ptr *ret, int* nr) {
- *ret = (ptr) &pf_runtime_lock[(*nr)&6];
- if (*nr > 6) *ret = (ptr) &PF_EXTEND_BARRIER; /* hack: do not feel like
creating a CMDsema_get */
+ *ret = (ptr) &pf_runtime_lock[(*nr)&3];
+ if (*nr > 3) *ret = (ptr) &PF_EXTEND_BARRIER; /* hack: do not feel like
creating a CMDsema_get */
return GDK_SUCCEED;
}
U xrpc_client.mx
Index: xrpc_client.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_client.mx,v
retrieving revision 1.45.2.1
retrieving revision 1.45.2.2
diff -u -d -r1.45.2.1 -r1.45.2.2
--- xrpc_client.mx 25 May 2008 21:29:25 -0000 1.45.2.1
+++ xrpc_client.mx 25 May 2008 21:49:55 -0000 1.45.2.2
@@ -40,19 +40,14 @@
@- HTTP client function(s)
@m
-
-.USE lock;
-
.COMMAND http_post(
str options,
- str qid,
- lng timeout,
[...1311 lines suppressed...]
+ if(!shredBAT)
return GDK_FAIL;
- }
- shredBAT = response2bat(timing, in, dst, port, *updCall);
-
- stream_close(out); stream_destroy(out);
- stream_close(in); stream_destroy(in);
- buffer_destroy(req);
-
- if(!shredBAT) return GDK_FAIL;
-
- if(qid && *qid) {
- if(add_xrpc_hosts(shredBAT, ws) == GDK_FAIL) {
- BBPreclaim(shredBAT);
- return GDK_FAIL;
- }
- }
*res = shredBAT;
return GDK_SUCCEED;
U serialize_dflt.mx
Index: serialize_dflt.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/serialize_dflt.mx,v
retrieving revision 1.46.2.1
retrieving revision 1.46.2.2
diff -u -d -r1.46.2.1 -r1.46.2.2
--- serialize_dflt.mx 25 May 2008 21:29:25 -0000 1.46.2.1
+++ serialize_dflt.mx 25 May 2008 21:49:53 -0000 1.46.2.2
@@ -40,7 +40,7 @@
/* contains dummy callback functions */
#include "serialize_null.h"
#include "pathfinder.h"
-#include "xrpc_common.h" /* *_NS defs, check_timeout_by_idx() */
+#include "xrpc_common.h"
/* a lot of characters, static strings, and their sizes
(the idea is to reuse constant character pointers during
@@ -996,147 +996,21 @@
return SUCCESS;
}
-static bool
-xrpc_serializeQID(XqueryCtx *ctx, char *qid_idx)
-{
- MT_Lock *xrpclock = NULL;
- int xrpclock_idx = XRPCLOCK_IDX, ret = 0;
- BAT *xrpc_qids = NULL, *xrpc_timeouts = NULL;
- BATiter qidi, toi;
- char *qid = NULL, *ts = NULL;
- lng timeout = 0;
- BUN bun_qid = BUN_NONE, bun_to = BUN_NONE;
- oid idx = oid_nil;
-
- if(CMDpflock_get((ptr)&xrpclock, &xrpclock_idx) == GDK_FAIL)
- return PROBLEM;
-
- MT_lock_set(xrpclock, "xrpc_serializeQID");
- if((xrpc_qids = BATdescriptor(BBPindex("xrpc_qids")))) {
- if((xrpc_timeouts = BATdescriptor(BBPindex("xrpc_timeouts")))) {
- errno = 0;
- ret = strtol(qid_idx, NULL, 10);
- if(errno) {
- GDKerror(strerror(errno));
- goto err_ret;
- }
-
- idx = (oid) ret;
- qidi = bat_iterator(xrpc_qids);
- toi = bat_iterator(xrpc_timeouts);
- BUNfndVOID(bun_qid, qidi, &idx);
- BUNfndVOID(bun_to, toi, &idx);
- if(bun_qid == BUN_NONE || bun_to == BUN_NONE)
- goto err_ret;
-
- qid = (char*)BUNtail(qidi, bun_qid);
- timeout = *(lng*)BUNtail(toi, bun_to);
- ts = strrchr(qid, '|'); assert(ts);
- ts[0] = '\0'; ts++; /* separate 'timestamp' from 'host:port' */
-
- ret = stream_printf(ctx->out,
- "<xrpc:queryID host=\"%s\" timestamp=\"%s\"
timeout=\""LLFMT"\"/>",
- qid, ts, timeout);
-
- ts--; ts[0] = '|';
-
- if(ret < 0) goto err_ret;
- BBPunfix(BBPcacheid(xrpc_qids));
- BBPunfix(BBPcacheid(xrpc_timeouts));
- MT_lock_unset(xrpclock, "xrpc_serializeQID");
- return SUCCESS;
-
- }
- BBPunfix(BBPcacheid(xrpc_qids));
- }
- MT_lock_unset(xrpclock, "xrpc_serializeQID");
- return PROBLEM;
-
-err_ret:
- BBPunfix(BBPcacheid(xrpc_qids));
- BBPunfix(BBPcacheid(xrpc_timeouts));
- MT_lock_unset(xrpclock, "xrpc_serializeQID");
- return PROBLEM;
-}
-
-static bool
-xrpc_serializeSuccessors(XqueryCtx *ctx, BAT *sucsBAT)
-{
- oid i, cnt = BATcount(sucsBAT);
- char *host = NULL;
- BATiter sucsBATi = bat_iterator(sucsBAT);
-
- if(stream_write(ctx->out, "<xrpc:participants>", 1, 19) != 19)
- return PROBLEM;
- for(i = 0; i < cnt; i++){
- host = (char *) BUNtail(sucsBATi, i);
- stream_printf(ctx->out, "<xrpc:participant>%s</xrpc:participant>",
host);
- }
- if(stream_write(ctx->out, "</xrpc:participants>", 1, 20) != 20)
- return PROBLEM;
-
- return SUCCESS;
-}
-
/**
* Start serialization of XRPC response message
*/
static bool
xrpc_startSerialize(XqueryCtx *ctx)
{
- int ret = 0;
- BUN bun = BUN_NONE;
- BAT *sucsBAT = NULL;
- BATiter wsi;
- oid batID = XRPC_SUCCESSOR, idx = oid_nil;
- char *name = NULL;
-
- /* Check if this XRPC query requires isolation. If yes, we need to
- * serialize 'queryID' and 'participants'. */
- wsi = bat_iterator(ctx->ws);
- BUNfndOID(bun, wsi, (ptr)&batID);
- if(bun == BUN_NONE) return PROBLEM;
- if(!(sucsBAT = BATdescriptor(*(bat*)Tloc(ctx->ws,bun))))
- return PROBLEM;
- if(!(name = BBPname(BBPcacheid(sucsBAT)))) {
- BBPunfix(BBPcacheid(sucsBAT));
- return PROBLEM;
- }
-
- /* check if the query has timed out or not, before printing results */
- if(strncmp(name, "tmp_", 4) != 0) {
- errno = 0;
- idx = (oid) strtol(name, NULL, 10);
- if(errno) {
- GDKerror("xrpc_startSerialize: failed to convert the name "
- "of the BAT ws[XRPC_SUCCESS] to an oid: %s.\n",
- strerror(errno));
- return PROBLEM;
- }
-
- ret = check_timeout_by_idx(idx);
- if(ret < 0) {
- return PROBLEM;
- } else if(ret == 1) {
- GDKerror("xrpc_startSerialize: xrpc-query timed out\n");
- return PROBLEM;
- }
- }
-
- ret = stream_write(ctx->out,
- "HTTP/1.1 200 OK\r\n"
- "Content-type: text/xml; charset=\"utf-8\"\r\n\r\n", 1, 60);
- if(ret != 60) return PROBLEM;
-
/* We need to prepend "=" to each line, so make sure
that we print one after each newline (encoded in
newline string 'newline')->
In addition we print one '=' at the beginning-> */
- if (ctx->modes & MODE_MAPI)
- if((ret = stream_write (ctx->out, &e_, 1, 1)) != 1)
- return PROBLEM;
+ if (ctx->modes & MODE_MAPI){
+ stream_write (ctx->out, &e_, 1, 1);
+ }
- ret = stream_printf(ctx->out,
+ stream_printf(ctx->out,
"<?xml version=\"1.0\" encoding=\"utf-8\"?>%s"
"<env:Envelope "
"xmlns:env=\"%s\" "
@@ -1153,20 +1027,7 @@
XSI_NS,
XRPC_NS, XRPC_LOC,
dflt_ws->module, dflt_ws->method);
- if(ret < 0) return PROBLEM;
-
- if(strncmp("tmp_", name, 4) != 0) {
- /* This is a 2PC-XRPC query! The name of the ws[XRPC_SUCCESS] BAT
- * encodes where to find the query meta-data in the xrpc_* BATs. */
- if(xrpc_serializeQID(ctx, name) == PROBLEM ||
- xrpc_serializeSuccessors(ctx, sucsBAT) == PROBLEM) {
- BBPunfix(BBPcacheid(sucsBAT));
- return PROBLEM;
- }
- }
-
- BBPunfix(BBPcacheid(sucsBAT));
return SUCCESS;
}
U shredder.mx
Index: shredder.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/shredder.mx,v
retrieving revision 1.136.2.1
retrieving revision 1.136.2.2
diff -u -d -r1.136.2.1 -r1.136.2.2
--- shredder.mx 25 May 2008 21:29:25 -0000 1.136.2.1
+++ shredder.mx 25 May 2008 21:49:54 -0000 1.136.2.2
@@ -1560,7 +1560,7 @@
if (shredCtx == NULL) return NULL;
/* fill it */
- int i = 9;
+ int i = 4;
memset(shredCtx, 0, sizeof(shredCtxStruct));
shredCtx->location = location;
shredCtx->fileSize = fileSize;
U Makefile.ag
Index: Makefile.ag
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/Makefile.ag,v
retrieving revision 1.86.2.1
retrieving revision 1.86.2.2
diff -u -d -r1.86.2.1 -r1.86.2.2
--- Makefile.ag 25 May 2008 21:29:23 -0000 1.86.2.1
+++ Makefile.ag 25 May 2008 21:49:50 -0000 1.86.2.2
@@ -55,7 +55,7 @@
pf_support.mx \
shredder.mx
LIBS = \
- -l_xrpc_common libserialize libpf $(PF_LIBS) \
+ libserialize libpf $(PF_LIBS) \
$(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS) -lmonet
$(PTHREAD_LIBS) \
$(MONETDB4_MODS) -l_lock -l_monettime -l_streams -l_builtin
-l_ascii_io -l_algebra -l_constant
}
@@ -98,17 +98,11 @@
$(MONETDB4_MODS) -l_logger -l_streams -l_builtin -l_ascii_io
-l_algebra -l_sys -l_constant -l_mapi
}
-lib__xrpc_common = {
- DIR = libdir/MonetDB4
- SOURCES = xrpc_common.mx
- LIBS = \
- $(SOCKET_LIBS) \
- $(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS)
-}
-
lib__xrpc_client = {
DIR = libdir/MonetDB4
- SOURCES = xrpc_client.mx
+ SOURCES = \
+ xrpc_common.mx \
+ xrpc_client.mx
LIBS = \
libserialize $(PF_LIBS) $(SOCKET_LIBS)
./lib_pf_support ./lib_pathfinder \
$(MONETDB_LIBS) -lbat -lstream $(MONETDB4_LIBS)
-lmonet \
@@ -119,6 +113,7 @@
DIR = libdir/MonetDB4
SOURCES = \
shttpd.c shttpd.h \
+ xrpc_common.mx \
xrpc_server.mx
LIBS = \
../compiler/libcompiler1 \
@@ -145,5 +140,5 @@
headers_mil = {
HEADERS = mil
DIR = libdir/MonetDB4
- SOURCES = pathfinder.mx pf_support.mx pf_standoff.mx xrpc_client.mx
xrpc_server.mx xrpc_common.mx
+ SOURCES = pathfinder.mx pf_support.mx pf_standoff.mx xrpc_client.mx
xrpc_server.mx
}
U xrpc_server.mx
Index: xrpc_server.mx
===================================================================
RCS file: /cvsroot/monetdb/pathfinder/runtime/xrpc_server.mx,v
retrieving revision 1.68.2.1
retrieving revision 1.68.2.2
diff -u -d -r1.68.2.1 -r1.68.2.2
--- xrpc_server.mx 25 May 2008 21:29:26 -0000 1.68.2.1
+++ xrpc_server.mx 25 May 2008 21:49:56 -0000 1.68.2.2
@@ -116,8 +116,8 @@
}
ADDHELP("get_xrpc_options", "zhang", "March 2007",
"DESCRIPTION:\n\
-Find the options that should be pass to the XRPC server, currently \
-\"timing\" or \"debug\"",
+Find the options that should be pass to the XRPC server, currently only\
+\"timing\"",
"xrpc_server");
PROC rpcd_start() : void {
[...1240 lines suppressed...]
+ if (option && strstr(option, "timing") != NULL)
+ timing = 1;
/* Register call back function, for XRPC (admin) requests, and XML
(get/put/delete) file handling */
shttpd_register_url(XRPC_REQ_CALLBACK, xrpc_fork_mapiclient, NULL);
@@ -1360,14 +1141,6 @@
ctx = shttpd_open_port(xrpc_port, *open);
listen_socket = ctx.sock;
- /* Check for expired queries every 5 seconds */
- sigfillset(&set); /* include all signals in 'set' */
- act.sa_handler = check_timeout;
- act.sa_mask = set; /* block all other signals */
- act.sa_flags = 0;
- sigaction(SIGALRM, &act, NULL);
- alarm(50000000);
-
/* Serve connections infinitely until someone kills us */
for ( ; rpcd_running; ) shttpd_poll(&ctx, 200);
-------------------------------------------------------------------------
This SF.net email is sponsored by: Microsoft
Defy all challenges. Microsoft(R) Visual Studio 2008.
http://clk.atdmt.com/MRT/go/vse0120000070mrt/direct/01/
_______________________________________________
Monetdb-pf-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-pf-checkins