Revision: 489
          http://vde.svn.sourceforge.net/vde/?rev=489&view=rev
Author:   danielel
Date:     2011-04-08 11:43:36 +0000 (Fri, 08 Apr 2011)

Log Message:
-----------
Added branch for wirefilter2 experimental code. WIP.

Modified Paths:
--------------
    branches/danielinux-wirefilter2/src/wirefilter.c

Added Paths:
-----------
    branches/danielinux-wirefilter2/

Modified: branches/danielinux-wirefilter2/src/wirefilter.c
===================================================================
--- trunk/vde-2/src/wirefilter.c        2011-04-05 10:05:11 UTC (rev 488)
+++ branches/danielinux-wirefilter2/src/wirefilter.c    2011-04-08 11:43:36 UTC 
(rev 489)
@@ -1,11 +1,11 @@
-/* WIREFILTER (C) 2005 Renzo Davoli
+/* WIREFILTER2 (C) 2011 Renzo Davoli, Daniele Lacamera
  * Licensed under the GPLv2
+ * Based on "wirefilter" by Renzo Davoli 
  * Modified by Ludovico Gardenghi 2005
  * Modified by Renzo Davoli, Luca Bigliardi 2007
- * Modified by Renzo Davoli, Luca Raggi 2009 (Markov chain support)
- * Gauss normal distribution/blinking support, requested and parlty implemented
- * by Luca Saiu and Jean-Vincent Loddo (Marionnet project)
- * Gilbert model for packet loss requested by Leandro Galvao.
+ * Modified by Renzo Davoli, Luca Raggi 2009
+ * Some implementation by:
+ * Luca Saiu and Jean-Vincent Loddo (Marionnet project)
  *
  * This filter can be used for testing network protcols. 
  * It is possible to loose, delay or reorder packets.
@@ -82,6 +82,17 @@
 #define MTU 8
 #define NUMVALUES 9
 
+#define BUFSIZE 2048
+#define MAXCMD 128
+#define MGMTMODEARG 129
+#define DAEMONIZEARG 130
+#define PIDFILEARG 131
+#define LOGSOCKETARG 132
+#define LOGIDARG 133
+#define KILO (1<<10)
+#define MEGA (1<<20)
+#define GIGA (1<<30)
+
 /* general Markov chain approach */
 int markov_numnodes=0;
 int markov_current=0;
@@ -121,6 +132,172 @@
 static char *blinkmsg;
 static char blinkidlen;
 
+static inline unsigned long long
+gettimeofdayms(void) {
+       struct timeval tv;
+       gettimeofday(&tv, 0);
+       return (unsigned long long) tv.tv_sec * 1000ULL + (unsigned long long) 
tv.tv_usec / 1000;
+}
+
+/*more than 98% inside the bell */
+#define SIGMA (1.0/3.0)
+static double compute_wirevalue(int tag, int dir)
+{
+       struct wirevalue *wv=&WFVAL(markov_current,tag,dir);
+       if (wv->plus == 0)
+               return wv->value;
+       switch (wv->alg) {
+               case ALGO_UNIFORM:
+                       return wv->value+wv->plus*((drand48()*2.0)-1.0);
+               case ALGO_GAUSS_NORMAL:
+                       {
+                               double x,y,r2;
+                               do {
+                                       x = (2*drand48())-1;
+                                       y = (2*drand48())-1;
+                                       r2=x*x+y*y;
+                               } while (r2 >= 1.0);
+                               return wv->value+wv->plus* SIGMA * x * sqrt ( 
(-2 * log(r2)) /r2);
+                       }
+               default:
+                       return 0.0;
+       }
+}
+
+
+/*** WF 2 ***/
+
+struct wf_packet {
+       struct wf_packet *next;
+       unsigned char payload[BUFSIZE];
+       unsigned short size;
+       unsigned long long dequeue_time;
+       int dir;
+};
+
+static unsigned long outqueue_delay;
+static struct wf_packet *wf_queue_in[2];
+static struct wf_packet *wf_queue_out[2];
+
+int queue_size(struct wf_packet *p) {
+       int n = 0;
+       while(p){
+               n++;
+               p=p->next;
+       }
+       return n;
+}
+
+static struct wf_packet *_pkt_enqueue(struct wf_packet *q, struct wf_packet 
*pkt)
+{
+       if (!q)
+               return pkt;
+
+       if (pkt->dequeue_time < q->dequeue_time) {
+               pkt->next = q;
+               return pkt;
+       }
+       q->next = _pkt_enqueue(q->next, pkt);
+       return q;
+}
+
+static void pkt_enqueue_in(struct wf_packet *pkt)
+{
+       struct wf_packet *q = wf_queue_in[pkt->dir];
+       pkt->next = NULL;
+       wf_queue_in[pkt->dir] = _pkt_enqueue(q, pkt);
+       fprintf(stderr,"enqueued[%d]. Size now: %d\n", pkt->dir, 
queue_size(wf_queue_in[pkt->dir]));
+       
+}
+
+static void pkt_enqueue_out(struct wf_packet *pkt)
+{
+       struct wf_packet *q = wf_queue_out[pkt->dir];
+       pkt->next = NULL;
+       wf_queue_out[pkt->dir] = _pkt_enqueue(q, pkt);
+       fprintf(stderr,"============= OUT =========== enqueued[%d]. Size now: 
%d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir]));
+}
+
+static int is_time_to_dequeue(int dir)
+{
+       unsigned long long now = gettimeofdayms();
+       if (wf_queue_in[dir]) 
+               return (now >= wf_queue_in[dir]->dequeue_time);
+       else return 0;
+}
+
+static int process_queue_out(void)
+{
+       static unsigned long long now, last_out[2] = {0ULL, 0ULL};
+       struct wf_packet *pkt;
+       int i, count[2] = {0}, old_count[2] = {0};
+       do {
+               old_count[0] = count[0];
+               old_count[1] = count[1];
+               for (i = 0; i < 2; i++) {
+                       double bandval;
+                       pkt = wf_queue_out[i];
+                       if (!pkt)
+                               continue;
+                       bandval = compute_wirevalue(BAND,i);
+                       if (bandval == 0) {
+                               writepacket(i, pkt->payload, pkt->size);
+                               wf_queue_out[i] = pkt->next;
+                               count[i] += pkt->size;
+                               last_out[i] = gettimeofdayms(); 
+                               free(pkt);
+                       } else {
+                               now = gettimeofdayms();
+                               pkt->dequeue_time = (unsigned long long) 
((double)last_out[i] + (((double)(pkt->size + count[i])*1000) / bandval));
+                               if (now >= pkt->dequeue_time) {
+                                       writepacket(i, pkt->payload, pkt->size);
+                                       wf_queue_out[i] = pkt->next;
+                                       count[i] += pkt->size;
+                                       last_out[i] = now; 
+                                       free(pkt);
+                               }
+                       }
+               }
+       } while (count[0] > old_count[0] || count[1] > old_count[1]);
+
+       if (count[0] > 0)
+               fprintf(stderr,">>------------> OUT process queue: %d bytes 
transferred\n", count[0]);
+       if (count[1] > 0)
+               fprintf(stderr,"<------------<< OUT process queue: %d bytes 
transferred\n", count[1]);
+       return count[0] + count[1];
+}
+
+static int process_queue_in(void)
+{
+       struct wf_packet *p;
+       int i, count = 0;
+       for (i = 0; i < 2; i++) {
+               if(is_time_to_dequeue(i)) {
+                       p = wf_queue_in[i];
+                       wf_queue_in[i] = p->next;
+                       pkt_enqueue_out(p);
+                       count++;
+               }
+       }
+       if (count > 0)
+               fprintf(stderr,"process queue: %d packets transferred\n", 
count);
+       return count;
+} 
+
+static struct wf_packet 
+*pkt_discard(struct wf_packet *q, struct wf_packet *pkt)
+{
+       if (!q)
+               return NULL;
+       if (pkt == q) {
+               free(pkt);
+               return q->next;
+       } else 
+               q->next = pkt_discard(q->next, pkt);
+       return q;
+}
+
+
 static void printoutc(int fd, const char *format, ...);
 /* markov node mgmt */
 static inline struct markov_node *markov_node_new(void)
@@ -235,16 +412,6 @@
        }
 }
 
-#define BUFSIZE 2048
-#define MAXCMD 128
-#define MGMTMODEARG 129
-#define DAEMONIZEARG 130
-#define PIDFILEARG 131
-#define LOGSOCKETARG 132
-#define LOGIDARG 133
-#define KILO (1<<10)
-#define MEGA (1<<20)
-#define GIGA (1<<30)
 
 static inline double max_wirevalue(int node,int tag, int dir)
 {
@@ -263,30 +430,6 @@
        srand48(v.tv_sec ^ v.tv_usec ^ getpid());
 }
 
-/*more than 98% inside the bell */
-#define SIGMA (1.0/3.0)
-static double compute_wirevalue(int tag, int dir)
-{
-       struct wirevalue *wv=&WFVAL(markov_current,tag,dir);
-       if (wv->plus == 0)
-               return wv->value;
-       switch (wv->alg) {
-               case ALGO_UNIFORM:
-                       return wv->value+wv->plus*((drand48()*2.0)-1.0);
-               case ALGO_GAUSS_NORMAL:
-                       {
-                               double x,y,r2;
-                               do {
-                                       x = (2*drand48())-1;
-                                       y = (2*drand48())-1;
-                                       r2=x*x+y*y;
-                               } while (r2 >= 1.0);
-                               return wv->value+wv->plus* SIGMA * x * sqrt ( 
(-2 * log(r2)) /r2);
-                       }
-               default:
-                       return 0.0;
-       }
-}
 
 void printlog(int priority, const char *format, ...)
 {
@@ -544,52 +687,110 @@
        }
 }
 
-void handle_packet(int dir,const unsigned char *buf,int size)
+unsigned long time_in_outqueue(dir)
 {
+       unsigned long bytes_in_queue = 0;
+       struct wf_packet *pkt = wf_queue_out[dir];
+       double bw_val = max_wirevalue(markov_current,BAND,dir);
+       unsigned long timetogo = 0;
+
+       if (!bw_val) {
+               return 0;
+       }
+       while(pkt) {
+               bytes_in_queue += pkt->size;
+               pkt = pkt->next;
+       } 
+       timetogo = 1000 * (bytes_in_queue / bw_val);
+       fprintf(stderr,"Time that will be spent in out queue: %lu ms (queue 
size: %lu B, speed: %.2f B/s)\n", 
+                       timetogo, bytes_in_queue, bw_val);
+       return timetogo;
+}
+
+void set_ingres_delay(struct wf_packet *pkt)
+{
+       int banddelay = 
+       pkt->dequeue_time = 0;
+       if (banddelay >= 0) {
+               if (banddelay > 0 || 
max_wirevalue(markov_current,DELAY,pkt->dir) > 0) {
+                       double delval=compute_wirevalue(DELAY,pkt->dir);
+                       delval=(delval >= 0)?delval+banddelay:banddelay;
+                       if (delval > 0) {
+                               struct timeval tv;
+                               unsigned long long now = gettimeofdayms();
+                               pkt->dequeue_time = now + delval - 
time_in_outqueue(pkt->dir); 
+                       }
+               }
+       }
+}
+
+void handle_packet(struct wf_packet *pkt)
+{
+       int times=1;
        /* MTU */
        /* if the packet is incosistent with the MTU of the line just drop it */
-       if (min_wirevalue(markov_current,MTU,dir) > 0 && size > 
min_wirevalue(markov_current,MTU,dir))
+       if (min_wirevalue(markov_current,MTU,pkt->dir) > 0 && pkt->size > 
min_wirevalue(markov_current,MTU,pkt->dir)) {
+               free(pkt);
                return;
+       }
 
        /* LOSS */
        /* Total packet loss */
-       if (min_wirevalue(markov_current,LOSS,dir) >= 100.0)
+       if (min_wirevalue(markov_current,LOSS,pkt->dir) >= 100.0) {
+               free(pkt);
                return;
+       }
        /* probabilistic loss */
-       if (max_wirevalue(markov_current,LOSTBURST,dir) > 0) {
+       if (max_wirevalue(markov_current,LOSTBURST,pkt->dir) > 0) {
                /* Gilbert model */
-               double losval=compute_wirevalue(LOSS,dir)/100;
-               double burstlen=compute_wirevalue(LOSTBURST,dir);
+               double losval=compute_wirevalue(LOSS,pkt->dir)/100;
+               double burstlen=compute_wirevalue(LOSTBURST,pkt->dir);
                double alpha=losval / (burstlen*(1-losval));
                double beta=1.0 / burstlen;
-               switch (loss_status[dir]) {
+               switch (loss_status[pkt->dir]) {
                        case OK_BURST:
-                               if (drand48() < alpha) 
loss_status[dir]=FAULTY_BURST;
+                               if (drand48() < alpha) 
loss_status[pkt->dir]=FAULTY_BURST;
                                break;
                        case FAULTY_BURST:
-                               if (drand48() < beta) loss_status[dir]=OK_BURST;
+                               if (drand48() < beta) 
loss_status[pkt->dir]=OK_BURST;
                                break;
                }
-               if (loss_status[dir] != OK_BURST)
+               if (loss_status[pkt->dir] != OK_BURST) {
+                       free(pkt);
                        return;
+               }
        } else {
-               loss_status[dir] = OK_BURST;
-               if (max_wirevalue(markov_current,LOSS,dir) > 0) {
+               loss_status[pkt->dir] = OK_BURST;
+               if (max_wirevalue(markov_current,LOSS,pkt->dir) > 0) {
                        /* standard non bursty model */
-                       double losval=compute_wirevalue(LOSS,dir)/100;
-                       if (drand48() < losval)
+                       double losval=compute_wirevalue(LOSS,pkt->dir)/100;
+                       if (drand48() < losval) {
+                               free(pkt);
                                return;
+                       }
                }
        }
 
        /* DUP */
        /* times is the number of dup packets */
-       int times=1;
-       if (max_wirevalue(markov_current,DDUP,dir) > 0) {
-               double dupval=compute_wirevalue(DDUP,dir)/100;
+       if (max_wirevalue(markov_current,DDUP,pkt->dir) > 0) {
+               double dupval=compute_wirevalue(DDUP,pkt->dir)/100;
                while (drand48() < dupval)
                        times++;
        }
+       while (times > 0) {
+               struct wf_packet *pkt_in;
+               if (times > 1) { 
+                       pkt_in = malloc(sizeof(struct wf_packet)); 
+                       memcpy(pkt_in, pkt, sizeof(struct wf_packet));
+               } else
+                       pkt_in = pkt;
+               set_ingres_delay(pkt_in);
+               pkt_enqueue_in(pkt_in);
+               times--;
+       }
+
+#if 0
        while (times>0) {
                int banddelay=0;
 
@@ -657,71 +858,84 @@
                }
                times--;
        }
+#endif
 }
 
 #define MIN(X,Y) (((X)<(Y))?(X):(Y))
 
-static void splitpacket(const unsigned char *buf,int size,int dir)
+static void splitpacket(struct wf_packet *pkt)
 {
        static unsigned char fragment[BUFSIZE][2];
        static unsigned char *fragp[2];
        static unsigned int rnx[2],remaining[2];
+       unsigned short size = pkt->size;
 
        //fprintf(stderr,"%s: splitpacket rnx=%d remaining=%d 
size=%d\n",progname,rnx[dir],remaining[dir],size);
-       if (size==0) return;
-       if (rnx[dir]>0) {
-               register int amount=MIN(remaining[dir],size);
+       if (pkt->size==0) return;
+       if (rnx[pkt->dir]>0) {
+               register int amount=MIN(remaining[pkt->dir],pkt->size);
                //fprintf(stderr,"%s: fragment amount %d\n",progname,amount);
-               memcpy(fragp[dir],buf,amount);
-               remaining[dir]-=amount;
-               fragp[dir]+=amount;
-               buf+=amount;
+               memcpy(fragp[pkt->dir],pkt->payload,amount);
+               remaining[pkt->dir]-=amount;
+               fragp[pkt->dir]+=amount;
                size-=amount;
-               if (remaining[dir]==0) {
+               if (remaining[pkt->dir]==0) {
                        //fprintf(stderr,"%s: delivered defrag 
%d\n",progname,rnx[dir]);
-                       handle_packet(dir,fragment[dir],rnx[dir]+2);
-                       rnx[dir]=0;
+                       pkt->size = rnx[pkt->dir]+2;
+                       memcpy(pkt->payload, fragment[pkt->dir], 
rnx[pkt->dir]+2);
+                       handle_packet(pkt);
+                       rnx[pkt->dir]=0;
                }
        }
        while (size > 0) {
-               rnx[dir]=(buf[0]<<8)+buf[1];
-               //fprintf(stderr,"%s: packet %d size %d %x %x dir 
%d\n",progname,rnx[dir],size-2,buf[0],buf[1],dir);
-               if (rnx[dir]>1521) {
-                       printlog(LOG_WARNING,"Packet length error size %d rnx 
%d",size,rnx[dir]);
-                       rnx[dir]=0;
+               rnx[pkt->dir]=(pkt->payload[0]<<8)+pkt->payload[1];
+               //fprintf(stderr,"%s: packet %d pkt->size %d %x %x pkt->dir 
%d\n",progname,rnx[pkt->dir],pkt->size-2,pkt->payload[0],pkt->payload[1],pkt->dir);
+               if (rnx[pkt->dir]>1521) {
+                       printlog(LOG_WARNING,"Packet length error pkt->size %d 
rnx %d",pkt->size,rnx[pkt->dir]);
+                       rnx[pkt->dir]=0;
                        return;
                }
-               if (rnx[dir]+2 > size) {
-                       //fprintf(stderr,"%s: begin defrag 
%d\n",progname,rnx[dir]);
-                       fragp[dir]=fragment[dir];
-                       memcpy(fragp[dir],buf,size);
-                       remaining[dir]=rnx[dir]+2-size;
-                       fragp[dir]+=size;
+               if (rnx[pkt->dir]+2 > size) {
+                       //fprintf(stderr,"%s: begin defrag 
%d\n",progname,rnx[pkt->dir]);
+                       fragp[pkt->dir]=fragment[pkt->dir];
+                       memcpy(fragp[pkt->dir],pkt->payload,pkt->size);
+                       remaining[pkt->dir]=rnx[pkt->dir]+2-size;
+                       fragp[pkt->dir]+=size;
                        size=0;
                } else {
-                       handle_packet(dir,buf,rnx[dir]+2);
-                       buf+=rnx[dir]+2;
-                       size-=rnx[dir]+2;
-                       rnx[dir]=0;
+                       pkt->size = rnx[pkt->dir]+2;
+                       handle_packet(pkt);
+                       size-=rnx[pkt->dir]+2;
+                       rnx[pkt->dir]=0;
                }
        }
 }
+
+       
                                        
-static void packet_in(int dir)
+static int packet_in(int dir)
 {
-       unsigned char buf[BUFSIZE];
+       struct wf_packet *pkt;
        int n;
+
+       pkt = malloc(sizeof(struct wf_packet));
+       pkt->next = NULL;
+       pkt->dir = dir;
        if(vdeplug[dir]) {
-               n=vde_recv(vdeplug[dir],buf+2,BUFSIZE-2,0);
-               buf[0]=n>>8;
-               buf[1]=n&0xFF;
-               handle_packet(dir,buf,n+2);
+               n=vde_recv(vdeplug[dir],pkt->payload + 2,BUFSIZE-2,0);
+               pkt->payload[0]=n>>8;
+               pkt->payload[1]=n&0xFF;
+               pkt->size = (unsigned short)n + 2;
+               handle_packet(pkt);
        } else {
-               n=read(pfd[dir].fd,buf,BUFSIZE);
-               if (n == 0)
+               n = read(pfd[dir].fd,pkt->payload,BUFSIZE);
+               if (n <= 0)
                        exit (0);
-               splitpacket(buf,n,dir);
+                       pkt->size = (unsigned short)n;
+               splitpacket(pkt);
        }
+       fprintf(stderr,"Packet In: %d\n",n);
+       return n;
 }
 
 static int check_open_fifos_n_plugs(struct pollfd *pfd,int *outfd,char 
*vdepath[],VDECONN *vdeplug[])
@@ -1666,7 +1880,7 @@
                                }
                        }
                }
-               n=poll(pfd,npfd,delay);
+               n=poll(pfd,npfd,1);
                if (pfd[0].revents & POLLHUP || (ndirs>1 && pfd[1].revents & 
POLLHUP))
                        exit(0);
                if (pfd[0].revents & POLLIN) {
@@ -1700,6 +1914,8 @@
                                exit(0);*/
                }
                markov_try();
-               packet_dequeue();
+               //packet_dequeue();
+               process_queue_out();
+               process_queue_in();
        }
 }


This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.

------------------------------------------------------------------------------
Xperia(TM) PLAY
It's a major breakthrough. An authentic gaming
smartphone on the nation's most reliable network.
And it wants your games.
http://p.sf.net/sfu/verizon-sfdev
_______________________________________________
vde-users mailing list
vde-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/vde-users

Reply via email to