Revision: 489 http://vde.svn.sourceforge.net/vde/?rev=489&view=rev Author: danielel Date: 2011-04-08 11:43:36 +0000 (Fri, 08 Apr 2011)
Log Message: ----------- Added branch for wirefilter2 experimental code. WIP. Modified Paths: -------------- branches/danielinux-wirefilter2/src/wirefilter.c Added Paths: ----------- branches/danielinux-wirefilter2/ Modified: branches/danielinux-wirefilter2/src/wirefilter.c =================================================================== --- trunk/vde-2/src/wirefilter.c 2011-04-05 10:05:11 UTC (rev 488) +++ branches/danielinux-wirefilter2/src/wirefilter.c 2011-04-08 11:43:36 UTC (rev 489) @@ -1,11 +1,11 @@ -/* WIREFILTER (C) 2005 Renzo Davoli +/* WIREFILTER2 (C) 2011 Renzo Davoli, Daniele Lacamera * Licensed under the GPLv2 + * Based on "wirefilter" by Renzo Davoli * Modified by Ludovico Gardenghi 2005 * Modified by Renzo Davoli, Luca Bigliardi 2007 - * Modified by Renzo Davoli, Luca Raggi 2009 (Markov chain support) - * Gauss normal distribution/blinking support, requested and parlty implemented - * by Luca Saiu and Jean-Vincent Loddo (Marionnet project) - * Gilbert model for packet loss requested by Leandro Galvao. + * Modified by Renzo Davoli, Luca Raggi 2009 + * Some implementation by: + * Luca Saiu and Jean-Vincent Loddo (Marionnet project) * * This filter can be used for testing network protcols. * It is possible to loose, delay or reorder packets. @@ -82,6 +82,17 @@ #define MTU 8 #define NUMVALUES 9 +#define BUFSIZE 2048 +#define MAXCMD 128 +#define MGMTMODEARG 129 +#define DAEMONIZEARG 130 +#define PIDFILEARG 131 +#define LOGSOCKETARG 132 +#define LOGIDARG 133 +#define KILO (1<<10) +#define MEGA (1<<20) +#define GIGA (1<<30) + /* general Markov chain approach */ int markov_numnodes=0; int markov_current=0; @@ -121,6 +132,172 @@ static char *blinkmsg; static char blinkidlen; +static inline unsigned long long +gettimeofdayms(void) { + struct timeval tv; + gettimeofday(&tv, 0); + return (unsigned long long) tv.tv_sec * 1000ULL + (unsigned long long) tv.tv_usec / 1000; +} + +/*more than 98% inside the bell */ +#define SIGMA (1.0/3.0) +static double compute_wirevalue(int tag, int dir) +{ + struct wirevalue *wv=&WFVAL(markov_current,tag,dir); + if (wv->plus == 0) + return wv->value; + switch (wv->alg) { + case ALGO_UNIFORM: + return wv->value+wv->plus*((drand48()*2.0)-1.0); + case ALGO_GAUSS_NORMAL: + { + double x,y,r2; + do { + x = (2*drand48())-1; + y = (2*drand48())-1; + r2=x*x+y*y; + } while (r2 >= 1.0); + return wv->value+wv->plus* SIGMA * x * sqrt ( (-2 * log(r2)) /r2); + } + default: + return 0.0; + } +} + + +/*** WF 2 ***/ + +struct wf_packet { + struct wf_packet *next; + unsigned char payload[BUFSIZE]; + unsigned short size; + unsigned long long dequeue_time; + int dir; +}; + +static unsigned long outqueue_delay; +static struct wf_packet *wf_queue_in[2]; +static struct wf_packet *wf_queue_out[2]; + +int queue_size(struct wf_packet *p) { + int n = 0; + while(p){ + n++; + p=p->next; + } + return n; +} + +static struct wf_packet *_pkt_enqueue(struct wf_packet *q, struct wf_packet *pkt) +{ + if (!q) + return pkt; + + if (pkt->dequeue_time < q->dequeue_time) { + pkt->next = q; + return pkt; + } + q->next = _pkt_enqueue(q->next, pkt); + return q; +} + +static void pkt_enqueue_in(struct wf_packet *pkt) +{ + struct wf_packet *q = wf_queue_in[pkt->dir]; + pkt->next = NULL; + wf_queue_in[pkt->dir] = _pkt_enqueue(q, pkt); + fprintf(stderr,"enqueued[%d]. Size now: %d\n", pkt->dir, queue_size(wf_queue_in[pkt->dir])); + +} + +static void pkt_enqueue_out(struct wf_packet *pkt) +{ + struct wf_packet *q = wf_queue_out[pkt->dir]; + pkt->next = NULL; + wf_queue_out[pkt->dir] = _pkt_enqueue(q, pkt); + fprintf(stderr,"============= OUT =========== enqueued[%d]. Size now: %d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir])); +} + +static int is_time_to_dequeue(int dir) +{ + unsigned long long now = gettimeofdayms(); + if (wf_queue_in[dir]) + return (now >= wf_queue_in[dir]->dequeue_time); + else return 0; +} + +static int process_queue_out(void) +{ + static unsigned long long now, last_out[2] = {0ULL, 0ULL}; + struct wf_packet *pkt; + int i, count[2] = {0}, old_count[2] = {0}; + do { + old_count[0] = count[0]; + old_count[1] = count[1]; + for (i = 0; i < 2; i++) { + double bandval; + pkt = wf_queue_out[i]; + if (!pkt) + continue; + bandval = compute_wirevalue(BAND,i); + if (bandval == 0) { + writepacket(i, pkt->payload, pkt->size); + wf_queue_out[i] = pkt->next; + count[i] += pkt->size; + last_out[i] = gettimeofdayms(); + free(pkt); + } else { + now = gettimeofdayms(); + pkt->dequeue_time = (unsigned long long) ((double)last_out[i] + (((double)(pkt->size + count[i])*1000) / bandval)); + if (now >= pkt->dequeue_time) { + writepacket(i, pkt->payload, pkt->size); + wf_queue_out[i] = pkt->next; + count[i] += pkt->size; + last_out[i] = now; + free(pkt); + } + } + } + } while (count[0] > old_count[0] || count[1] > old_count[1]); + + if (count[0] > 0) + fprintf(stderr,">>------------> OUT process queue: %d bytes transferred\n", count[0]); + if (count[1] > 0) + fprintf(stderr,"<------------<< OUT process queue: %d bytes transferred\n", count[1]); + return count[0] + count[1]; +} + +static int process_queue_in(void) +{ + struct wf_packet *p; + int i, count = 0; + for (i = 0; i < 2; i++) { + if(is_time_to_dequeue(i)) { + p = wf_queue_in[i]; + wf_queue_in[i] = p->next; + pkt_enqueue_out(p); + count++; + } + } + if (count > 0) + fprintf(stderr,"process queue: %d packets transferred\n", count); + return count; +} + +static struct wf_packet +*pkt_discard(struct wf_packet *q, struct wf_packet *pkt) +{ + if (!q) + return NULL; + if (pkt == q) { + free(pkt); + return q->next; + } else + q->next = pkt_discard(q->next, pkt); + return q; +} + + static void printoutc(int fd, const char *format, ...); /* markov node mgmt */ static inline struct markov_node *markov_node_new(void) @@ -235,16 +412,6 @@ } } -#define BUFSIZE 2048 -#define MAXCMD 128 -#define MGMTMODEARG 129 -#define DAEMONIZEARG 130 -#define PIDFILEARG 131 -#define LOGSOCKETARG 132 -#define LOGIDARG 133 -#define KILO (1<<10) -#define MEGA (1<<20) -#define GIGA (1<<30) static inline double max_wirevalue(int node,int tag, int dir) { @@ -263,30 +430,6 @@ srand48(v.tv_sec ^ v.tv_usec ^ getpid()); } -/*more than 98% inside the bell */ -#define SIGMA (1.0/3.0) -static double compute_wirevalue(int tag, int dir) -{ - struct wirevalue *wv=&WFVAL(markov_current,tag,dir); - if (wv->plus == 0) - return wv->value; - switch (wv->alg) { - case ALGO_UNIFORM: - return wv->value+wv->plus*((drand48()*2.0)-1.0); - case ALGO_GAUSS_NORMAL: - { - double x,y,r2; - do { - x = (2*drand48())-1; - y = (2*drand48())-1; - r2=x*x+y*y; - } while (r2 >= 1.0); - return wv->value+wv->plus* SIGMA * x * sqrt ( (-2 * log(r2)) /r2); - } - default: - return 0.0; - } -} void printlog(int priority, const char *format, ...) { @@ -544,52 +687,110 @@ } } -void handle_packet(int dir,const unsigned char *buf,int size) +unsigned long time_in_outqueue(dir) { + unsigned long bytes_in_queue = 0; + struct wf_packet *pkt = wf_queue_out[dir]; + double bw_val = max_wirevalue(markov_current,BAND,dir); + unsigned long timetogo = 0; + + if (!bw_val) { + return 0; + } + while(pkt) { + bytes_in_queue += pkt->size; + pkt = pkt->next; + } + timetogo = 1000 * (bytes_in_queue / bw_val); + fprintf(stderr,"Time that will be spent in out queue: %lu ms (queue size: %lu B, speed: %.2f B/s)\n", + timetogo, bytes_in_queue, bw_val); + return timetogo; +} + +void set_ingres_delay(struct wf_packet *pkt) +{ + int banddelay = + pkt->dequeue_time = 0; + if (banddelay >= 0) { + if (banddelay > 0 || max_wirevalue(markov_current,DELAY,pkt->dir) > 0) { + double delval=compute_wirevalue(DELAY,pkt->dir); + delval=(delval >= 0)?delval+banddelay:banddelay; + if (delval > 0) { + struct timeval tv; + unsigned long long now = gettimeofdayms(); + pkt->dequeue_time = now + delval - time_in_outqueue(pkt->dir); + } + } + } +} + +void handle_packet(struct wf_packet *pkt) +{ + int times=1; /* MTU */ /* if the packet is incosistent with the MTU of the line just drop it */ - if (min_wirevalue(markov_current,MTU,dir) > 0 && size > min_wirevalue(markov_current,MTU,dir)) + if (min_wirevalue(markov_current,MTU,pkt->dir) > 0 && pkt->size > min_wirevalue(markov_current,MTU,pkt->dir)) { + free(pkt); return; + } /* LOSS */ /* Total packet loss */ - if (min_wirevalue(markov_current,LOSS,dir) >= 100.0) + if (min_wirevalue(markov_current,LOSS,pkt->dir) >= 100.0) { + free(pkt); return; + } /* probabilistic loss */ - if (max_wirevalue(markov_current,LOSTBURST,dir) > 0) { + if (max_wirevalue(markov_current,LOSTBURST,pkt->dir) > 0) { /* Gilbert model */ - double losval=compute_wirevalue(LOSS,dir)/100; - double burstlen=compute_wirevalue(LOSTBURST,dir); + double losval=compute_wirevalue(LOSS,pkt->dir)/100; + double burstlen=compute_wirevalue(LOSTBURST,pkt->dir); double alpha=losval / (burstlen*(1-losval)); double beta=1.0 / burstlen; - switch (loss_status[dir]) { + switch (loss_status[pkt->dir]) { case OK_BURST: - if (drand48() < alpha) loss_status[dir]=FAULTY_BURST; + if (drand48() < alpha) loss_status[pkt->dir]=FAULTY_BURST; break; case FAULTY_BURST: - if (drand48() < beta) loss_status[dir]=OK_BURST; + if (drand48() < beta) loss_status[pkt->dir]=OK_BURST; break; } - if (loss_status[dir] != OK_BURST) + if (loss_status[pkt->dir] != OK_BURST) { + free(pkt); return; + } } else { - loss_status[dir] = OK_BURST; - if (max_wirevalue(markov_current,LOSS,dir) > 0) { + loss_status[pkt->dir] = OK_BURST; + if (max_wirevalue(markov_current,LOSS,pkt->dir) > 0) { /* standard non bursty model */ - double losval=compute_wirevalue(LOSS,dir)/100; - if (drand48() < losval) + double losval=compute_wirevalue(LOSS,pkt->dir)/100; + if (drand48() < losval) { + free(pkt); return; + } } } /* DUP */ /* times is the number of dup packets */ - int times=1; - if (max_wirevalue(markov_current,DDUP,dir) > 0) { - double dupval=compute_wirevalue(DDUP,dir)/100; + if (max_wirevalue(markov_current,DDUP,pkt->dir) > 0) { + double dupval=compute_wirevalue(DDUP,pkt->dir)/100; while (drand48() < dupval) times++; } + while (times > 0) { + struct wf_packet *pkt_in; + if (times > 1) { + pkt_in = malloc(sizeof(struct wf_packet)); + memcpy(pkt_in, pkt, sizeof(struct wf_packet)); + } else + pkt_in = pkt; + set_ingres_delay(pkt_in); + pkt_enqueue_in(pkt_in); + times--; + } + +#if 0 while (times>0) { int banddelay=0; @@ -657,71 +858,84 @@ } times--; } +#endif } #define MIN(X,Y) (((X)<(Y))?(X):(Y)) -static void splitpacket(const unsigned char *buf,int size,int dir) +static void splitpacket(struct wf_packet *pkt) { static unsigned char fragment[BUFSIZE][2]; static unsigned char *fragp[2]; static unsigned int rnx[2],remaining[2]; + unsigned short size = pkt->size; //fprintf(stderr,"%s: splitpacket rnx=%d remaining=%d size=%d\n",progname,rnx[dir],remaining[dir],size); - if (size==0) return; - if (rnx[dir]>0) { - register int amount=MIN(remaining[dir],size); + if (pkt->size==0) return; + if (rnx[pkt->dir]>0) { + register int amount=MIN(remaining[pkt->dir],pkt->size); //fprintf(stderr,"%s: fragment amount %d\n",progname,amount); - memcpy(fragp[dir],buf,amount); - remaining[dir]-=amount; - fragp[dir]+=amount; - buf+=amount; + memcpy(fragp[pkt->dir],pkt->payload,amount); + remaining[pkt->dir]-=amount; + fragp[pkt->dir]+=amount; size-=amount; - if (remaining[dir]==0) { + if (remaining[pkt->dir]==0) { //fprintf(stderr,"%s: delivered defrag %d\n",progname,rnx[dir]); - handle_packet(dir,fragment[dir],rnx[dir]+2); - rnx[dir]=0; + pkt->size = rnx[pkt->dir]+2; + memcpy(pkt->payload, fragment[pkt->dir], rnx[pkt->dir]+2); + handle_packet(pkt); + rnx[pkt->dir]=0; } } while (size > 0) { - rnx[dir]=(buf[0]<<8)+buf[1]; - //fprintf(stderr,"%s: packet %d size %d %x %x dir %d\n",progname,rnx[dir],size-2,buf[0],buf[1],dir); - if (rnx[dir]>1521) { - printlog(LOG_WARNING,"Packet length error size %d rnx %d",size,rnx[dir]); - rnx[dir]=0; + rnx[pkt->dir]=(pkt->payload[0]<<8)+pkt->payload[1]; + //fprintf(stderr,"%s: packet %d pkt->size %d %x %x pkt->dir %d\n",progname,rnx[pkt->dir],pkt->size-2,pkt->payload[0],pkt->payload[1],pkt->dir); + if (rnx[pkt->dir]>1521) { + printlog(LOG_WARNING,"Packet length error pkt->size %d rnx %d",pkt->size,rnx[pkt->dir]); + rnx[pkt->dir]=0; return; } - if (rnx[dir]+2 > size) { - //fprintf(stderr,"%s: begin defrag %d\n",progname,rnx[dir]); - fragp[dir]=fragment[dir]; - memcpy(fragp[dir],buf,size); - remaining[dir]=rnx[dir]+2-size; - fragp[dir]+=size; + if (rnx[pkt->dir]+2 > size) { + //fprintf(stderr,"%s: begin defrag %d\n",progname,rnx[pkt->dir]); + fragp[pkt->dir]=fragment[pkt->dir]; + memcpy(fragp[pkt->dir],pkt->payload,pkt->size); + remaining[pkt->dir]=rnx[pkt->dir]+2-size; + fragp[pkt->dir]+=size; size=0; } else { - handle_packet(dir,buf,rnx[dir]+2); - buf+=rnx[dir]+2; - size-=rnx[dir]+2; - rnx[dir]=0; + pkt->size = rnx[pkt->dir]+2; + handle_packet(pkt); + size-=rnx[pkt->dir]+2; + rnx[pkt->dir]=0; } } } + + -static void packet_in(int dir) +static int packet_in(int dir) { - unsigned char buf[BUFSIZE]; + struct wf_packet *pkt; int n; + + pkt = malloc(sizeof(struct wf_packet)); + pkt->next = NULL; + pkt->dir = dir; if(vdeplug[dir]) { - n=vde_recv(vdeplug[dir],buf+2,BUFSIZE-2,0); - buf[0]=n>>8; - buf[1]=n&0xFF; - handle_packet(dir,buf,n+2); + n=vde_recv(vdeplug[dir],pkt->payload + 2,BUFSIZE-2,0); + pkt->payload[0]=n>>8; + pkt->payload[1]=n&0xFF; + pkt->size = (unsigned short)n + 2; + handle_packet(pkt); } else { - n=read(pfd[dir].fd,buf,BUFSIZE); - if (n == 0) + n = read(pfd[dir].fd,pkt->payload,BUFSIZE); + if (n <= 0) exit (0); - splitpacket(buf,n,dir); + pkt->size = (unsigned short)n; + splitpacket(pkt); } + fprintf(stderr,"Packet In: %d\n",n); + return n; } static int check_open_fifos_n_plugs(struct pollfd *pfd,int *outfd,char *vdepath[],VDECONN *vdeplug[]) @@ -1666,7 +1880,7 @@ } } } - n=poll(pfd,npfd,delay); + n=poll(pfd,npfd,1); if (pfd[0].revents & POLLHUP || (ndirs>1 && pfd[1].revents & POLLHUP)) exit(0); if (pfd[0].revents & POLLIN) { @@ -1700,6 +1914,8 @@ exit(0);*/ } markov_try(); - packet_dequeue(); + //packet_dequeue(); + process_queue_out(); + process_queue_in(); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ Xperia(TM) PLAY It's a major breakthrough. An authentic gaming smartphone on the nation's most reliable network. And it wants your games. http://p.sf.net/sfu/verizon-sfdev _______________________________________________ vde-users mailing list vde-users@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/vde-users