Changeset: ed10194a1320 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ed10194a1320
Modified Files:
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:

Add window strides
First step towards slowly moving windows based on
row count and time


diffs (129 lines):

diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -26,7 +26,7 @@
  */
 
 #include "monetdb_config.h"
-#include <gdk.h>
+#include "gdk.h"
 #include "iot.h"
 #include "basket.h"
 #include "mal_exception.h"
@@ -36,7 +36,7 @@
 //#define _DEBUG_BASKET_ if(0)
 #define _DEBUG_BASKET_ 
 
-str statusname[4] = { "<unknown>", "running", "paused", "locked" };
+str statusname[4] = { "<unknown>", "active", "paused", "locked" };
 
 BasketRec *baskets;   /* the global iot catalog */
 static int bsktTop = 0, bsktLimit = 0;
@@ -531,12 +531,21 @@ recover:
 }
 
 /* remove tuples from a basket according to the sliding policy */
+#define ColumnShift(B,TPE, STRIDE) { \
+       TPE *first= (TPE*) Tloc(B, BUNfirst(B));\
+       TPE *n = first+STRIDE;\
+       TPE *last=  (TPE*) Tloc(B, BUNlast(B));\
+       for( ; n < last; n++, first++)\
+               *first=*n;\
+}
+
 str
 BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
     str sch = *getArgReference_str(stk, pci, 1);
     str tbl = *getArgReference_str(stk, pci, 2);
        BAT *b;
+       BUN cnt, stride;
        node *n;
        mvc *m = NULL;
        str msg;
@@ -547,12 +556,71 @@ BSKTfinish(Client cntxt, MalBlkPtr mb, M
        (void) pci;
 
        msg= getSQLContext(cntxt,NULL, &m, NULL);
+       if( msg )
+               throw(SQL,"iot.finish","Missing SQL context");
     bskt = BSKTlocate(sch,tbl);
        if (bskt == 0)
                throw(SQL, "iot.finish", "Could not find the basket 
%s.%s",sch,tbl);
 
-       if( msg ==MAL_SUCCEED)
-       /* reset all stream BATs to empty*/
+       /* window stride option */
+       if( baskets[bskt].winsize && (stride =baskets[bskt].winstride)){
+shiftcolumns:
+               for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+                       sql_column *c = n->data;
+                       b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+                       assert( b );
+                       cnt=BATcount(b);
+                       if( cnt < stride)
+                               break;
+
+                       switch(ATOMstorage(b->ttype)){
+                       case TYPE_bit:ColumnShift(b,bit,stride); break;
+                       case TYPE_bte:ColumnShift(b,bte,stride); break;
+                       case TYPE_sht:ColumnShift(b,sht,stride); break;
+                       case TYPE_int:ColumnShift(b,int,stride); break;
+                       case TYPE_oid:ColumnShift(b,oid,stride); break;
+                       case TYPE_flt:ColumnShift(b,flt,stride); break;
+                       case TYPE_dbl:ColumnShift(b,dbl,stride); break;
+                       case TYPE_lng:ColumnShift(b,lng,stride); break;
+#ifdef HAVE_HGE
+                       case TYPE_hge:ColumnShift(b,hge,stride); break;
+#endif
+                       case TYPE_str:
+                               switch(b->T->width){
+                               case 1: ColumnShift(b,bte,stride); break;
+                               case 2: ColumnShift(b,sht,stride); break;
+                               case 4: ColumnShift(b,int,stride); break;
+                               case 8: ColumnShift(b,lng,stride); break;
+                               }
+                                       break;
+                       default: break;
+                       }
+                       BATsetcount(b, BATcount(b)-stride);
+                       BBPunfix(b->batCacheid);
+               }
+               return MAL_SUCCEED;
+       }
+       
+       /* time stride option, prepare a new stride based on the leading 
'iotclk' */
+       if( baskets[bskt].timeslice && baskets[bskt].timestride){
+               sql_column *c;
+               lng *first, *last, stop;
+               n = baskets[bskt].table->columns.set->h; 
+               c = n->data;
+               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               assert( b );
+               if( b->ttype !=TYPE_timestamp)
+                       throw(SQL, "iot.finish", "Could not find the leading 
'iotclk' in %s.%s",sch,tbl);
+               first= (lng*) Tloc(b, BUNfirst(b));
+               last = (lng*) Tloc(b, BUNlast(b));
+               stride =0;
+               stop = *first + baskets[bskt].timestride;
+               for( ; first < last; first++)
+                       if (*first >stop) break;
+               goto shiftcolumns;
+       }
+
+       /* default action: reset all stream BATs to empty*/
        for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
                sql_column *c = n->data;
                b = store_funcs.bind_col(m->session->tr,c,RDONLY);
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -48,7 +48,7 @@
 
 #define MAXPN 200           /* it is the minimum, if we need more space 
GDKrealloc */
 
-static str statusname[6] = { "<unknown>", "running", "paused"};
+static str statusname[6] = { "<unknown>", "active", "paused"};
 
 static void
 PNstartScheduler(void);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to