Update of /cvsroot/monetdb/sql/src/storage/restrict
In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv16119/restrict

Added Files:
        Makefile.ag restrict_logger.mx restrict_storage.mx 
        restrict_table.mx 
Log Message:
This checkin adds 3 new storage solutions, all should only be used
in restricted situtations 
        store_su (sql_debug&64) 
                'single user mode, all changes are directly applied 
                 (one copy less)'
        store_ro (sql_debug&32)
                'read only access, no changes, ie not Update and insert bats'
        store_suro ((sql_debug&96)==96)
                (single user, read only) 


--- NEW FILE: Makefile.ag ---
# The contents of this file are subject to the MonetDB Public License
# Version 1.1 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
#
# Software distributed under the License is distributed on an "AS IS"
# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
# License for the specific language governing rights and limitations
# under the License.
#
# The Original Code is the MonetDB Database System.
#
# The Initial Developer of the Original Code is CWI.
# Portions created by CWI are Copyright (C) 1997-2007 CWI.
# All Rights Reserved.

## Process this file with automake to produce Makefile.in

INCLUDES = .. ../../include ../../common ../bat $(MONETDB_INCS)

lib_restrictstore = {
        NOINST
        DIR = libdir
        SOURCES = \
                restrict_table.mx restrict_storage.mx restrict_logger.mx 
}

--- NEW FILE: restrict_table.mx ---
@' The contents of this file are subject to the MonetDB Public License
@' Version 1.1 (the "License"); you may not use this file except in
@' compliance with the License. You may obtain a copy of the License at
@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
@'
@' Software distributed under the License is distributed on an "AS IS"
@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
@' License for the specific language governing rights and limitations
@' under the License.
@'
@' The Original Code is the MonetDB Database System.
@'
@' The Initial Developer of the Original Code is CWI.
@' Portions created by CWI are Copyright (C) 1997-2007 CWI.
@' All Rights Reserved.

@f restrict_table
@+ The read only table store
The read only table stores tables using, Monet's binary association 
tables (BATs). In the global view of the database, a table with columns Cj 
with 1<j<n, is stored on disk using the following set of BATs. 
Each column has one base bat (Bj), containing the current column values. 
As we like to use void bats we cannot use the bat delete functionality, so we 
keep an extra bat with the deleted oids (D). As there are no updates in the
read only store, no more bats etc are needed.

@h
#ifndef RESTRICT_TABLE_H
#define RESTRICT_TABLE_H

#include "sql_storage.h"

/* initialize restrict storage call back functions interface */
extern int su_table_init( table_functions *tf );
extern int ro_table_init( table_functions *tf );
extern int suro_table_init( table_functions *tf );

#endif /*RESTRICT_TABLE_H*/
@c
#include "sql_config.h"
#include <bat/bat_utils.h>
#include "restrict_table.h"
#include "restrict_storage.h"

static BAT *
full_column(sql_column *c, BAT *d, BAT *s )
{
        sql_bat *bat = c->data;
        BAT *r, *b = temp_descriptor(bat->bid);
        if (s) {
                BAT *t = BATsemijoin(b,s); bat_destroy(b); b = t;
        }
        if (d && BATcount(d)) {
                r = BATkdiff(b,BATmirror(d)); bat_destroy(b); b = r;
        }
        return b;
}

static ssize_t
column_find_row(sql_trans *tr, sql_column *c, void *value, ...)
{
        va_list va;
        BUN q;
        BAT *b = NULL, *s = NULL, *r = NULL, *d = NULL;
        ssize_t rid = -1;
        sql_column *nc;
        void *nv;
        sql_bat *bat = c->t->data;

        if (bat->bid) 
                d = store_funcs.bind_del(tr, c->t, RDONLY);
        va_start(va, value);
        while ((nc = va_arg(va, sql_column *)) != NULL) {
                nv = va_arg(va, void *);

                b = full_column(c, d, s);
                if (s)
                        bat_destroy(s);
                s = BATselect(b, value, value);
                bat_destroy(b);
                c = nc;
                value = nv;
        }
        b = full_column(c, d, s);
        if (s)
                bat_destroy(s);
        if (d)
                bat_destroy(d);

        r = BATmirror(b);
        q = BUNfnd(r, value);
        if (q != BUN_NONE) {
                BATiter ri = bat_iterator(r);
                rid = *(oid *) BUNtail(ri, q);
        }
        bat_destroy(b);
        return rid;
}

static void *
column_find_value(sql_trans *tr, sql_column *c, ssize_t rid)
{
        BUN q;
        BAT *b, *d = NULL;
        void *res = NULL;
        sql_bat *bat = c->t->data;

        if (bat->bid) 
                d = store_funcs.bind_del(tr, c->t, RDONLY);
        b = full_column(c, d, NULL);
        if (d)
                bat_destroy(d);

        q = BUNfnd(b, (ptr) &rid);
        if (q != BUN_NONE) {
                BATiter bi = bat_iterator(b);
                void *r;
                int sz;

                res = BUNtail(bi, q);
                sz = ATOMlen(b->ttype, res);
                r = GDKmalloc(sz);
                memcpy(r,res,sz);
                res = r;
        }
        bat_destroy(b);
        return res;
}

static int
column_update_value(sql_trans *tr, sql_column *c, ssize_t rid, void *value)
{
        assert(rid != (ssize_t)oid_nil);
        assert(rid != -1);

        store_funcs.update_col(tr, c, value, c->type.type->localtype, rid);
        return 0;
}

static int
table_insert(sql_trans *tr, sql_table *t, ...)
{
        va_list va;
        node *n = cs_first_node(&t->columns);
        void *val = NULL;
        int cnt = 0;

        va_start(va, t);
        for (val = va_arg(va, void *); n && val; n = n->next, val = va_arg(va, 
void *))
        {
                sql_column *c = n->data;
                store_funcs.append_col(tr, c, val, c->type.type->localtype);
                cnt++;
        }
        t->cnt ++;
        if (n) {
                fprintf(stderr, "called table_insert(%s) with wrong number of 
args (%d,%d)\n", t->base.name, list_length(t->columns.set), cnt);
                assert(0);
                return -1;
        }
        return 0;
}

static int
table_delete(sql_trans *tr, sql_table *t, ssize_t rid)
{
        assert(rid != (ssize_t)oid_nil);
        assert(rid != -1);

        store_funcs.delete_tab(tr, t, &rid, TYPE_oid);
        return 0;
}

/* returns table rids, for the given select ranges */
static rids *
rids_select( sql_trans *tr, sql_column *key, void *key_value_low, void 
*key_value_high, ...)
{
        va_list va;
        BAT *b = NULL, *s = NULL, *d = NULL;
        sql_column *nc;
        void *nvl, *nvh;
        rids *rs = NEW(rids);
        sql_bat *bat = key->t->data;

        /* special case, key_value_low and high NULL, ie return all */
        if (bat->bid) 
                d = store_funcs.bind_del(tr, key->t, RDONLY);
        if (key_value_low || key_value_high) {
                va_start(va, key_value_high);
                while ((nc = va_arg(va, sql_column *)) != NULL) {
                        nvl = va_arg(va, void *);
                        nvh = va_arg(va, void *);
        
                        b = full_column(key, d, s);
                        if (s)
                                bat_destroy(s);
                        if (!key_value_low)
                                key_value_low = ATOMnilptr(b->ttype);
                        if (!key_value_high)
                                key_value_high = ATOMnilptr(b->ttype);
                        s = BATselect(b, key_value_low, key_value_high);
                        bat_destroy(b);
                        key = nc;
                        key_value_low = nvl;
                        key_value_high = nvh;
                }
        }
        b = full_column(key, d, s);
        if (s)
                bat_destroy(s);
        if (d)
                bat_destroy(d);
        if (key_value_low || key_value_high) {
                if (!key_value_low)
                        key_value_low = ATOMnilptr(b->ttype);
                if (!key_value_high)
                        key_value_high = ATOMnilptr(b->ttype);
                rs->data = BATselect(b, key_value_low, key_value_high);
                bat_destroy(b);
        } else {
                rs->data = b;
        }
        rs->cur = 0;
        return rs;
}

/* order rids by orderby_column values */
static rids *
rids_orderby(sql_trans *tr, rids *r, sql_column *orderby_col)
{
        BAT *b, *d = NULL;
        sql_bat *bat = orderby_col->t->data;

        if (bat->bid) 
                d = store_funcs.bind_del(tr, orderby_col->t, RDONLY);
        b = full_column(orderby_col, d, r->data);
        if (d)
                bat_destroy(d);
        bat_destroy(r->data);
        b = BATmirror(b);
        r->data = BATmirror(BATsort(b));
        bat_destroy(b);
        return r;
}


/* return table rids from result of rids_select, return (-1) when done */
static ssize_t 
rids_next(rids *r)
{
        if (r->cur < BATcount(r->data)) {
                BATiter bi = bat_iterator(r->data);
                return *(oid*)BUNhead(bi, r->cur++);
        }
        return -1;
}

static rids *
rids_join(sql_trans *tr, rids *l, sql_column *lc, rids *r, sql_column *rc)
{
        BAT *lcb, *rcb, *d = NULL;
        sql_bat *lbat, *rbat;
        
        lbat = lc->t->data;
        if (lbat->bid) 
                d = store_funcs.bind_del(tr, lc->t, RDONLY);
        lcb = full_column(lc, d, r->data);
        if (d)
                bat_destroy(d);
        rbat = rc->t->data;
        if (rbat->bid) 
                d = store_funcs.bind_del(tr, rc->t, RDONLY);
        rcb = full_column(rc, d, r->data);
        if (d)
                bat_destroy(d);
        bat_destroy(l->data);
        l->data = BATjoin(lcb, BATmirror(rcb), BATcount(lcb));
        bat_destroy(lcb);
        bat_destroy(rcb);
        return l;
}

/* clean up the resources taken by the result of rids_select */
static void 
rids_destroy(rids *r)
{
        bat_destroy(r->data);
        _DELETE(r);
}

int 
su_table_init( table_functions *tf )
{
        tf->column_find_row = column_find_row;
        tf->column_find_value = column_find_value;

        tf->column_update_value = column_update_value;
        tf->table_insert = table_insert;
        tf->table_delete = table_delete;
        
        tf->rids_select = rids_select;
        tf->rids_orderby = rids_orderby;
        tf->rids_join = rids_join;
        tf->rids_next = rids_next;
        tf->rids_destroy = rids_destroy;
        return LOG_OK;
}

int 
ro_table_init( table_functions *tf )
{
        tf->column_find_row = column_find_row;
        tf->column_find_value = column_find_value;

        tf->column_update_value = NULL;
        tf->table_insert = NULL;
        tf->table_delete = NULL;
        
        tf->rids_select = rids_select;
        tf->rids_orderby = rids_orderby;
        tf->rids_join = rids_join;
        tf->rids_next = rids_next;
        tf->rids_destroy = rids_destroy;
        return LOG_OK;
}

int 
suro_table_init( table_functions *tf )
{
        return ro_table_init(tf);
}

--- NEW FILE: restrict_storage.mx ---
@' The contents of this file are subject to the MonetDB Public License
@' Version 1.1 (the "License"); you may not use this file except in
@' compliance with the License. You may obtain a copy of the License at
@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
@'
@' Software distributed under the License is distributed on an "AS IS"
@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
@' License for the specific language governing rights and limitations
@' under the License.
@'
@' The Original Code is the MonetDB Database System.
@'
@' The Initial Developer of the Original Code is CWI.
@' Portions created by CWI are Copyright (C) 1997-2007 CWI.
@' All Rights Reserved.

@f restrict_storage

@h
#ifndef RESTRICT_STORAGE_H
#define RESTRICT_STORAGE_H

#include "sql_storage.h"
#include "restrict_logger.h"

typedef struct sql_bat {
        char *name;     /* name of the main bat */
        int bid;
        int ubid;       /* bat with old updated values (su only) */
} sql_bat;

#define bat_set_access(b,access) b->P->restricted = access
#define bat_clear(b) 
bat_set_access(b,BAT_WRITE);BATclear(b);bat_set_access(b,BAT_READ)

/* initialize bat storage call back functions interface */
extern int su_storage_init( store_functions *sf );
extern int ro_storage_init( store_functions *sf );
extern int suro_storage_init( store_functions *sf );

#endif /*RESTRICT_STORAGE_H */

@c
#include "sql_config.h"
#include "restrict_storage.h"
#include <bat/bat_utils.h>
#include <sql_string.h>

static BAT *
bind_col(sql_trans *tr, sql_column *c, int access)
{
        sql_bat *bat = c->data;
        BAT *b = temp_descriptor(bat->bid);

        assert(b);
        bat_set_access(b, BAT_READ);
        b->batDirty |= 2;
        c->base.rtime = c->t->base.rtime = c->t->s->base.rtime = tr->rtime = 
tr->stime;
        assert(access != RD_UPD && access != RD_INS);
        return b;
}

static BAT *
bind_idx(sql_trans *tr, sql_idx * i, int access)
{
        sql_bat *bat = i->data;
        BAT *b = temp_descriptor(bat->bid);

        assert(b);
        bat_set_access(b, BAT_READ);
        b->batDirty |= 2;
        i->base.rtime = i->t->base.rtime = i->t->s->base.rtime = tr->rtime = 
tr->stime;
        assert(access != RD_UPD && access != RD_INS);
        return b;
}

static BAT *
bind_del(sql_trans *tr, sql_table *t, int access)
{
        BAT *b;
        sql_bat *bat = t->data;

#ifdef NDEBUG
        (void) access; /* satisfy compiler */
#endif
        assert(access == RDONLY);
        b = temp_descriptor(bat->bid);
        assert(b);
        b->batDirty |= 2;
        t->s->base.rtime = t->base.rtime = tr->stime;
        return b;
}

/* we could have an unsafe mode which simply forgets the old values */
static void
update_bat( sql_bat *bat, BAT *upd, int isnew) 
{
        BAT *b = temp_descriptor(bat->bid);

        if (!isnew) {
                BAT *u = NULL;
                BAT *old = BATkdiff(b, upd), *r;

                if (bat->ubid) {
                        u = temp_descriptor(bat->ubid);
                } else {
                        u = bat_new(TYPE_oid, b->ttype, 1);
                        bat->ubid = temp_create(u);
                }
                r = BATkdiff(old, u); /* don't keep allready updated values */ 
                bat_destroy(old);
                BATins(u, r, FALSE);
                bat_destroy(u);
                bat_destroy(r);
        }
        void_replace_bat(b, upd, TRUE);
        bat_destroy(b);
}

static void
update_val( sql_bat *bat, ssize_t rid, void *upd, int isnew) 
{
        BAT *b;

        assert(rid != (ssize_t)oid_nil);
        assert(rid != -1);
        b = temp_descriptor(bat->bid);
        if (!isnew) {
                BAT *u = NULL;

                if (bat->ubid) {
                        u = temp_descriptor(bat->ubid);
                } else {
                        u = bat_new(TYPE_oid, b->ttype, 1);
                        bat->ubid = temp_create(u);
                }
                if (BUNfnd(u, (ptr)&rid) == BUN_NONE) {
                        BUN p;
                        BATiter bi = bat_iterator(b);
                        BUNfndVOID(p, bi, (ptr)&rid);  
                        BUNins(u, &rid, BUNtail(bi,p), FALSE);
                }
                bat_destroy(u);
        }
        void_inplace(b, rid, upd, TRUE);
        bat_destroy(b);
}

/* for now we simply apply the updates, problem is we can't rollback */

static void
update_col(sql_trans *tr, sql_column *c, void *i, int tpe, ssize_t rid)
{
        sql_bat *bat = c->data;

        c->base.wtime = c->t->base.wtime = c->t->s->base.wtime = tr->wtime = 
tr->stime;
        c->base.rtime = c->t->base.rtime = c->t->s->base.rtime = tr->rtime = 
tr->stime;
        if (tpe == TYPE_bat)
                update_bat(bat, i, isNew(c));
        else 
                update_val(bat, rid, i, isNew(c));
}

static void 
update_idx(sql_trans *tr, sql_idx * i, void *ib, int tpe)
{
        sql_bat *bat = i->data;

        i->base.wtime = i->t->base.wtime = i->t->s->base.wtime = tr->wtime = 
tr->stime;
        i->base.rtime = i->t->base.rtime = i->t->s->base.rtime = tr->rtime = 
tr->stime;
        if (tpe == TYPE_bat)
                update_bat(bat, ib, isNew(i));
        else
                assert(0);
}

static void
append_bat( sql_bat *bat, BAT *i ) 
{
        BAT *b = temp_descriptor(bat->bid);

        if (BATcount(b) == 0 && !isVIEW(i) && i->htype == TYPE_void && i->ttype 
!= TYPE_void){
                temp_destroy(bat->bid);
                bat->bid = temp_create(i);
                BATseqbase(i, 0);
                i->batDirty |= 2;
        } else {
                BATappend(b, i, TRUE);
        }
        bat_destroy(b);
}

static void
append_val( sql_bat *bat, void *i ) 
{
        BAT *b = temp_descriptor(bat->bid);

        BUNappend(b, i, TRUE);
        bat_destroy(b);
}

static void 
append_col(sql_trans *tr, sql_column *c, void *i, int tpe)
{
        sql_bat *bat = c->data;

        c->base.wtime = c->t->base.wtime = c->t->s->base.wtime = tr->wtime = 
tr->stime;
        c->base.rtime = c->t->base.rtime = c->t->s->base.rtime = tr->rtime = 
tr->stime;
        if (tpe == TYPE_bat)
                append_bat(bat, i);
        else
                append_val(bat, i);
}

static void
append_idx(sql_trans *tr, sql_idx * i, void *ib, int tpe)
{
        sql_bat *bat = i->data;

        i->base.wtime = i->t->base.wtime = i->t->s->base.wtime = tr->wtime = 
tr->stime;
        i->base.rtime = i->t->base.rtime = i->t->s->base.rtime = tr->rtime = 
tr->stime;
        if (tpe == TYPE_bat)
                append_bat(bat, ib);
        else
                append_val(bat, ib);
}

static void
delete_bat( sql_bat *bat, BAT *i ) 
{
        BAT *b = temp_descriptor(bat->bid);

        if (BATcount(b) == 0 && !isVIEW(i) && i->htype == TYPE_void && i->ttype 
!= TYPE_void){
                temp_destroy(bat->bid);
                bat->bid = temp_create(i);
                i->batDirty |= 2;
        } else {
                BATappend(b, i, TRUE);
        }
        bat_destroy(b);
}

static void
delete_val( sql_bat *bat, ptr i ) 
{
        BAT *b = temp_descriptor(bat->bid);

        BUNappend(b, i, TRUE);
        bat_destroy(b);
}

static void
delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
{
        sql_bat *bat = t->data;

        t->base.wtime = t->s->base.wtime = tr->wtime = tr->stime;
        t->base.rtime = t->s->base.rtime = tr->rtime = tr->stime;
        if (tpe == TYPE_bat)
                delete_bat(bat, ib);
        else
                delete_val(bat, ib);
}

static sql_bat *
load_bat( char *sname, char *tname, char *bname, int type, int sz, lng *cnt ) 
{
        BAT *b;
        sql_bat *bat = ZNEW(sql_bat);

        (void)type;
        (void)sz;
        bat->name = sql_message("%s_%s_%s", sname, tname, bname );
        b = quick_descriptor(logger_find_bat(restrict_logger, bat->name));
        bat->bid = temp_create(b);
        *cnt = BATcount(b);
        return bat;
}

static int
new_persistent_bat(sql_trans *tr, sql_bat *bat, char *sname, char *tname, char 
*bname, lng *cnt) 
{
        int ok = LOG_OK;
        BAT *b = temp_descriptor(bat->bid);

        bat->name = sql_message("%s_%s_%s", sname, tname, bname);
        bat->bid = logger_add_bat(restrict_logger, b, bat->name);

        if (tr->parent == gtrans) {
                /* snapshot large bats */
                if (BATcount(b) > 1000) 
                        BATmode(b, PERSISTENT);
                if (BATcount(b) > (size_t) REMAP_PAGE_MAXSIZE)
                        BATmmap(b, STORE_MMAP, STORE_MMAP, STORE_MMAP);
                ok = log_bat_persists(restrict_logger, b, bat->name);
        }
        *cnt = BATcount(b);
        bat_destroy(b);
        return ok;
}

static int
create_col(sql_trans *tr, sql_column *c)
{
        int ok = LOG_OK;
        int type = c->type.type->localtype;
        sql_bat *bat = c->data;

        (void)tr;
        if (c->base.flag == TR_OLD && !isTempTable(c->t)){
                c->data = load_bat( c->t->s->base.name, c->t->base.name, 
c->base.name, type, c->t->sz, &c->t->cnt);
        } else if (bat && !isTempTable(c->t)) {
                assert(active_store_type == store_su);
                return new_persistent_bat( tr, c->data, c->t->s->base.name, 
c->t->base.name, c->base.name, &c->t->cnt);
        } else {
                if (!bat)
                        bat = c->data = ZNEW(sql_bat);
                if (!bat->bid) {
                        BAT *b = bat_new(TYPE_void, type, c->t->sz);
                        if (!b) 
                                return LOG_ERR;
                        bat->bid = temp_create(b);
                        bat_destroy(b);
                }
        }
        return ok;
}

/* will be called for new idx's and when new index columns are create */
static int
create_idx(sql_trans *tr, sql_idx *ni)
{
        int ok = LOG_OK;
        sql_bat *bat = ni->data;
        int type = TYPE_int;

        if (ni->type == join_idx)
                type = TYPE_oid;

        /* create bats for a loaded idx structure */
        if (ni->base.flag == TR_OLD && !isTempTable(ni->t)){
                if (ni->type == join_idx || list_length(ni->columns) > 1 ) {
                        ni->data = load_bat( ni->t->s->base.name, 
ni->t->base.name, ni->base.name, type, ni->t->sz, &ni->t->cnt);
                }
        } else if (bat && !isTempTable(ni->t)) { 
                assert(active_store_type == store_su);
                return new_persistent_bat( tr, ni->data, ni->t->s->base.name, 
ni->t->base.name, ni->base.name, &ni->t->cnt);
        } else {
                if (ni->type == join_idx || list_length(ni->columns) > 1 ) {
                        if (!bat)
                                bat = ni->data = ZNEW(sql_bat);
                        if (!bat->bid) {
                                BAT *b = bat_new(TYPE_void, type, ni->t->sz);
                                if (!b) 
                                        return LOG_ERR;
                                bat->bid = temp_create(b);
                                bat_destroy(b);
                        }
                }
                if (ni->type == unique && 
                    ni->key != NULL && list_length(ni->key->columns)==1) {
                        sql_kc *c = ni->columns->h->data;
                        BAT *b = bind_col(tr->parent, c->c, RDONLY);

                        BATkey(BATmirror(b), BOUND2BTRUE);
                        bat_destroy(b);
                }
        }
        return ok;
}

static int
create_del(sql_trans *tr, sql_table *t)
{
        int ok = LOG_OK;
        BAT *b;
        sql_bat *bat = t->data;

        (void)tr;
        if (t->base.flag == TR_OLD && !isTempTable(t)) {
                bat = t->data = ZNEW(sql_bat);
                bat->name = sql_message("D_%s_%s", t->s->base.name, 
t->base.name);
                b = quick_descriptor(logger_find_bat(restrict_logger, 
bat->name));
                bat->bid = temp_create(b);
        } else if (bat && !isTempTable(t)) {
                assert(active_store_type == store_su);
                b = temp_descriptor(bat->bid);
                bat->bid = temp_create(b);
                bat->name = sql_message("D_%s_%s", t->s->base.name, 
t->base.name);
                (void) logger_add_bat(restrict_logger, b, bat->name);
                if (tr->parent == gtrans) {
                        /* snapshot large bats */
                        if (BATcount(b) > 1000) 
                                BATmode(b, PERSISTENT);
                        if (BATcount(b) > (size_t) REMAP_PAGE_MAXSIZE)
                                BATmmap(b, STORE_MMAP, STORE_MMAP, STORE_MMAP);
                        ok = log_bat_persists(restrict_logger, b, bat->name);
                }
                t->cnt -= BATcount(b);
                bat_destroy(b);
        } else {
                if (!bat)
                        bat = t->data = ZNEW(sql_bat);
                if (!bat->bid) {
                        b = bat_new(TYPE_void, TYPE_oid, t->sz);
                        bat->bid = temp_create(b);
                        bat_destroy(b);
                }
        }
        return ok;
}

static int 
dup_bat(sql_trans *tr, sql_table *t, sql_bat *obat, sql_bat *bat, int type, int 
oc_isnew, int c_isnew)
{
        (void)tr;
        (void)t;
        (void)type;
        (void)oc_isnew;
        (void)c_isnew;
        if (!obat)
                return LOG_OK;
        bat->bid = obat->bid;
        if (bat->bid)
                temp_dup(bat->bid);
        return LOG_OK;
}

static int 
dup_col(sql_trans *tr, sql_column *oc, sql_column *c )
{
        if (oc->data) {
                int type = c->type.type->localtype;
                sql_bat *bat = c->data = ZNEW(sql_bat), *obat = oc->data;
                return dup_bat(tr, c->t, obat, bat, type, isNew(oc), 
c->base.flag == TR_NEW);
        }
        return LOG_OK;
}

static int 
dup_idx(sql_trans *tr, sql_idx *i, sql_idx *ni )
{
        if (i->data) {
                int type = (ni->type==join_idx)?TYPE_oid:TYPE_int;
                sql_bat *bat = ni->data = ZNEW(sql_bat), *obat = i->data;
                return dup_bat(tr, ni->t, obat, bat, type, isNew(i), 
ni->base.flag == TR_NEW);
        }
        return LOG_OK;
}

static int
dup_del(sql_trans *tr, sql_table *ot, sql_table *t)
{
        sql_bat *bat = t->data = ZNEW(sql_bat), *obat = ot->data;

        bat->bid = obat->bid;
        if (bat->bid) 
                obat->bid = temp_copy(bat->bid, isTempTable(t));
        (void)tr;
        return LOG_OK;
}

/* For the single user case 
   the destroy functions should handle rollbacks of not commited changes
 */
static int
destroy_bat(sql_trans *tr, sql_bat *b, int rollback)
{
        int ok = LOG_OK;
        if (!b)
                return ok;
        if (tr && tr->parent == gtrans) {
                log_bid bid;
                if (b->bid && b->name) {
                        ok = log_bat_transient(restrict_logger, b->name);
                        bid = logger_find_bat(restrict_logger, b->name);
                        if (bid) 
                                logger_del_bat(restrict_logger, bid);
                } 
        } else {
                /* also rollback non committed changes */
                if (rollback) {
                        BAT *bd = temp_descriptor(b->bid);
                        if (b->ubid) {
                                BAT *u = temp_descriptor(b->ubid);
                                void_replace_bat(bd, u, TRUE);
                                bat_destroy(u);
                        }
                        /* rollback inserts */
                        BATundo(bd);
                        bat_destroy(bd);
                }
                if (b->name)
                        _DELETE(b->name);
                if (b->bid) 
                        temp_destroy(b->bid);
                if (b->ubid) 
                        temp_destroy(b->ubid);
                b->bid = b->ubid = 0;
                b->name = NULL;
                _DELETE(b);
        }
        return ok;
}

static int
destroy_col(sql_trans *tr, sql_column *c)
{
        int ok = destroy_bat(tr, c->data, (tr && tr->stime == c->base.wtime));
        if (!tr || tr->parent != gtrans) 
                c->data = NULL;
        return ok;
}

static int
destroy_idx(sql_trans *tr, sql_idx *i)
{
        int ok = destroy_bat(tr, i->data, (tr && tr->stime == i->base.wtime));
        if (!tr || tr->parent != gtrans) 
                i->data = NULL;
        return ok;
}

static int
destroy_del(sql_trans *tr, sql_table *t)
{
        int ok = destroy_bat(tr, t->data, (tr && tr->stime == t->base.wtime));
        if (!tr || tr->parent != gtrans) 
                t->data = NULL;
        return ok;
}

static size_t 
clear_bat(sql_trans *tr, sql_bat *bat)
{
        size_t sz = 0;

        (void)tr;
        if (bat->bid) {
                BAT *b = temp_descriptor(bat->bid);
                sz += BATcount(b);
                bat_clear(b);
                BATcommit(b);
                bat_destroy(b);
        }
        return sz;
}

static size_t 
clear_col(sql_trans *tr, sql_column *c)
{
        if (c->data && isTempTable(c->t))
                return clear_bat(tr, c->data);
        return 0;
}

static size_t
clear_idx(sql_trans *tr, sql_idx *i)
{
        if (i->data && isTempTable(i->t))
                return clear_bat(tr, i->data);
        return 0;
}

static size_t
clear_del(sql_trans *tr, sql_table *t)
{
        if (t->data && isTempTable(t))
                return clear_bat(tr, t->data);
        return 0;
}

static int 
tr_update_bat( sql_trans *tr, sql_bat *obat, sql_bat *cbat, lng *cnt)
{
        int ok = LOG_OK;
        BAT *cb = temp_descriptor(cbat->bid);

        /* handle bat swaps */
        if (cbat->bid != obat->bid) {
                temp_destroy(obat->bid);
                obat->bid = cbat->bid = temp_create(cb);
                if (tr->parent == gtrans) {
                        BATmode(cb, PERSISTENT);
                        if (BATcount(cb) > (size_t) REMAP_PAGE_MAXSIZE)
                                BATmmap(cb, STORE_MMAP, STORE_MMAP, STORE_MMAP);
                        logger_add_bat(restrict_logger, cb, obat->name);
                        log_bat_persists(restrict_logger, cb, obat->name);
                }
        /* any inserts */
        } else if (BUNlast(cb) > BUNfirst(cb)) {
                if (ok == LOG_OK && tr->parent == gtrans)
                        ok = log_bat(restrict_logger, cb, obat->name);
                BATcommit(cb);
        }

        if (cbat->ubid) {
                temp_destroy(cbat->ubid);
                BATcommit(cb);
                cbat->ubid = 0;
        }
        *cnt = BATcount(cb);
        bat_destroy(cb);
        return ok;
}

static int
update_table(sql_trans *tr, sql_table *ft, sql_table *tt)
{
        int ok = LOG_OK;
        node *n, *m;
        lng deleted = 0;

        if (ft->cleared) {
                (void)store_funcs.clear_del(tr->parent, tt);
                for (n = tt->columns.set->h; n; n = n->next) 
                        (void)store_funcs.clear_col(tr->parent, n->data);
                if (tt->idxs.set) 
                        for (n = tt->idxs.set->h; n; n = n->next) 
                                (void)store_funcs.clear_idx(tr->parent, 
n->data);
        }
        tr_update_bat(tr, tt->data, ft->data, &deleted);
        for (n = ft->columns.set->h, m = tt->columns.set->h; ok == LOG_OK && n 
&& m; n = n->next, m = m->next) {
                sql_column *cc = n->data;
                sql_column *oc = m->data;
                sql_bat *cbat = cc->data;

                /* for cleared tables the bid is reset */
                if (cbat->bid == 0) {
                        sql_bat *obat = oc->data;
                        cbat->bid = obat->bid;
                        temp_dup(cbat->bid);
                }
                if (!cc->base.wtime) 
                        continue;
                tr_update_bat(tr, oc->data, cc->data, &cc->t->cnt);
                tt->cnt = cc->t->cnt;

                if (cc->base.rtime)
                        oc->base.rtime = tr->stime;
                oc->base.wtime = tr->stime;
                cc->base.rtime = cc->base.wtime = 0;
        }
        if (ok == LOG_OK && tt->idxs.set) {
                for (n = ft->idxs.set->h, m = tt->idxs.set->h; ok == LOG_OK && 
n && m; n = n->next, m = m->next) {
                        sql_idx *ci = n->data;
                        sql_idx *oi = m->data;
                        sql_bat *cbat = ci->data;
                        sql_bat *obat = oi->data;

                        /* some indices have no bats */
                        if (!obat)
                                continue;

                        /* for cleared tables the bid is reset */
                        if (cbat->bid == 0) {
                                cbat->bid = obat->bid;
                                temp_dup(cbat->bid);
                        }
                        if (!ci->base.wtime) 
                                continue;
                        tr_update_bat(tr, oi->data, ci->data, &ci->t->cnt);

                        if (ci->base.rtime)
                                oi->base.rtime = tr->stime;
                        oi->base.wtime = tr->stime;
                        ci->base.rtime = ci->base.wtime = 0;
                }
        }
        tt->cnt -= deleted;
        return ok;
}

int
su_storage_init( store_functions *sf)
{
        sf->bind_col = (bind_col_fptr)&bind_col;
        sf->bind_idx = (bind_idx_fptr)&bind_idx;
        sf->bind_del = (bind_del_fptr)&bind_del;

        sf->append_col = (append_col_fptr)&append_col;
        sf->append_idx = (append_idx_fptr)&append_idx;
        sf->update_col = (update_col_fptr)&update_col;
        sf->update_idx = (update_idx_fptr)&update_idx;
        sf->delete_tab = (delete_tab_fptr)&delete_tab;

        sf->create_col = (create_col_fptr)&create_col;
        sf->create_idx = (create_idx_fptr)&create_idx;
        sf->create_del = (create_del_fptr)&create_del;

        sf->dup_col = (dup_col_fptr)&dup_col;
        sf->dup_idx = (dup_idx_fptr)&dup_idx;
        sf->dup_del = (dup_del_fptr)&dup_del;

        sf->destroy_col = (destroy_col_fptr)&destroy_col;
        sf->destroy_idx = (destroy_idx_fptr)&destroy_idx;
        sf->destroy_del = (destroy_del_fptr)&destroy_del;

        sf->clear_col = (clear_col_fptr)&clear_col;
        sf->clear_idx = (clear_idx_fptr)&clear_idx;
        sf->clear_del = (clear_del_fptr)&clear_del;

        sf->update_table = (update_table_fptr)&update_table;
        return LOG_OK;
}

int
ro_storage_init( store_functions *sf)
{
        sf->bind_col = (bind_col_fptr)&bind_col;
        sf->bind_idx = (bind_idx_fptr)&bind_idx;
        sf->bind_del = (bind_del_fptr)&bind_del;

        sf->append_col = (append_col_fptr)NULL;
        sf->append_idx = (append_idx_fptr)NULL;
        sf->update_col = (update_col_fptr)NULL;
        sf->update_idx = (update_idx_fptr)NULL;
        sf->delete_tab = (delete_tab_fptr)NULL;

        sf->create_col = (create_col_fptr)&create_col;
        sf->create_idx = (create_idx_fptr)&create_idx;
        sf->create_del = (create_del_fptr)&create_del;

        sf->dup_col = (dup_col_fptr)&dup_col;
        sf->dup_idx = (dup_idx_fptr)&dup_idx;
        sf->dup_del = (dup_del_fptr)&dup_del;

        sf->destroy_col = (destroy_col_fptr)&destroy_col;
        sf->destroy_idx = (destroy_idx_fptr)&destroy_idx;
        sf->destroy_del = (destroy_del_fptr)&destroy_del;

        sf->clear_col = (clear_col_fptr)&clear_col;
        sf->clear_idx = (clear_idx_fptr)&clear_idx;
        sf->clear_del = (clear_del_fptr)&clear_del;

        sf->update_table = (update_table_fptr)NULL;
        return LOG_OK;
}

int
suro_storage_init( store_functions *sf)
{
        return ro_storage_init(sf);
}

--- NEW FILE: restrict_logger.mx ---
@' The contents of this file are subject to the MonetDB Public License
@' Version 1.1 (the "License"); you may not use this file except in
@' compliance with the License. You may obtain a copy of the License at
@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html
@'
@' Software distributed under the License is distributed on an "AS IS"
@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
@' License for the specific language governing rights and limitations
@' under the License.
@'
@' The Original Code is the MonetDB Database System.
@'
@' The Initial Developer of the Original Code is CWI.
@' Portions created by CWI are Copyright (C) 1997-2007 CWI.
@' All Rights Reserved.

@f restrict_logger
@h
#ifndef RESTRICT_LOGGER_H
#define RESTRICT_LOGGER_H

#include "sql_storage.h"
#include <gdk_logger.h>

extern logger *restrict_logger;

extern int su_logger_init( logger_functions *lf );
extern int ro_logger_init( logger_functions *lf );
extern int suro_logger_init( logger_functions *lf );

#endif /*RESTRICT_LOGGER_H */
@c
#include "sql_config.h"
#include "restrict_logger.h"

logger *restrict_logger = NULL;

static int 
bl_create(char *logdir, char *dbname, int catalog_version)
{
        if (restrict_logger)
                return LOG_ERR;
        restrict_logger = logger_create(0, "sql", logdir, dbname, 
catalog_version);
        if (restrict_logger)
                return LOG_OK;
        return LOG_ERR;
}

static void 
bl_destroy(void)
{
        logger *l = restrict_logger;

        restrict_logger = NULL;
        if (l) {
                logger_exit(l);
                logger_destroy(l);
        }
}

static int 
bl_restart(void)
{
        if (restrict_logger)
                return logger_restart(restrict_logger);
        return LOG_OK;
}

static int
bl_cleanup(void)
{
        if (restrict_logger)
                return logger_cleanup(restrict_logger);
        return LOG_OK;
}

static int
bl_changes(void)
{       
        return logger_changes(restrict_logger);
}

static int 
ro_restart(void)
{
        assert(0);
        return LOG_OK;
}

static int
ro_cleanup(void)
{
        assert(0);
        return LOG_OK;
}

static int
ro_changes(void)
{       
        int c = logger_changes(restrict_logger);
        assert(c==0);
        return c;
}

static int 
bl_get_sequence(int seq, lng *id)
{
        return logger_sequence(restrict_logger, seq, id);
}

static int
bl_log_isnew(void)
{
        if (BATcount(restrict_logger->catalog) > 10) {
                return 0;
        }
        return 1;
}

static int 
bl_tstart(void)
{
        return log_tstart(restrict_logger);
}

static int 
bl_tend(void)
{
        return log_tend(restrict_logger);
}

static int 
bl_sequence(int seq, lng id)
{
        return log_sequence(restrict_logger, seq, id);
}

static int 
ro_tstart(void)
{
        assert(0);
        return 0;
}

static int 
ro_tend(void)
{
        assert(0);
        return 0;
}

static int 
ro_sequence(int seq, lng id)
{
        assert(0);
        (void)seq;
        (void)id;
        return 0;
}

int 
su_logger_init( logger_functions *lf )
{
        lf->create = bl_create;
        lf->destroy = bl_destroy;
        lf->restart = bl_restart;
        lf->cleanup = bl_cleanup;
        lf->changes = bl_changes;
        lf->get_sequence = bl_get_sequence;
        lf->log_isnew = bl_log_isnew;
        lf->log_tstart = bl_tstart;
        lf->log_tend = bl_tend;
        lf->log_sequence = bl_sequence;
        return LOG_OK;
}

int 
ro_logger_init( logger_functions *lf )
{
        lf->create = bl_create;
        lf->destroy = bl_destroy;
        lf->restart = ro_restart;
        lf->cleanup = ro_cleanup;
        lf->changes = ro_changes;
        lf->get_sequence = bl_get_sequence;
        lf->log_isnew = bl_log_isnew;
        lf->log_tstart = ro_tstart;
        lf->log_tend = ro_tend;
        lf->log_sequence = ro_sequence;
        return LOG_OK;
}

int 
suro_logger_init( logger_functions *lf )
{
        return ro_logger_init(lf);
}


-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://ad.doubleclick.net/clk;164216239;13503038;w?http://sf.net/marketplace
_______________________________________________
Monetdb-sql-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-sql-checkins

Reply via email to