Revision: 492
          http://vde.svn.sourceforge.net/vde/?rev=492&view=rev
Author:   danielel
Date:     2011-04-10 13:52:40 +0000 (Sun, 10 Apr 2011)

Log Message:
-----------
Inverted queue order:

* Ingres queue is now clocked by bandwidth value
* Output queue is the place where packets wait for their dequeue time, 
  calculated upon their delay value. Its maximum length is automatically 
  fixed to BDP.


           ingres         output
read() -> [||||||] ----> [|||||||] -----> write()
         RED/DT--^    ^          ^--expired pkt time
                      |
constant rate (-b)----/

Modified Paths:
--------------
    branches/danielinux-wirefilter2/src/wirefilter.c

Modified: branches/danielinux-wirefilter2/src/wirefilter.c
===================================================================
--- branches/danielinux-wirefilter2/src/wirefilter.c    2011-04-09 14:32:56 UTC 
(rev 491)
+++ branches/danielinux-wirefilter2/src/wirefilter.c    2011-04-10 13:52:40 UTC 
(rev 492)
@@ -41,6 +41,7 @@
 #include <stdint.h>
 
 #define min(a,b) a<b?a:b
+#define max(a,b) a>b?a:b
 
 #if defined(VDE_DARWIN) || defined(VDE_FREEBSD)
 #      if defined HAVE_SYSLIMITS_H
@@ -276,6 +277,7 @@
        if (!nofifo && wf_queue_in_tail[pkt->dir]) {
                wf_queue_in_tail[pkt->dir]->next = pkt;
                wf_queue_in_tail[pkt->dir] = pkt;
+       //      fprintf(stderr,"enqueued[%d]. Size now: %d\n", pkt->dir, 
queue_size(wf_queue_in[pkt->dir]));
                return;
        }
        wf_queue_in[pkt->dir] = _pkt_enqueue(q, pkt);
@@ -288,6 +290,17 @@
        struct wf_packet *q = wf_queue_out[pkt->dir];
        queue_size_out[pkt->dir] += pkt->size;
        pkt->next = NULL;
+       if (!q) {
+               wf_queue_out[pkt->dir] = pkt;
+               wf_queue_out_tail[pkt->dir] = pkt;
+               return;
+       }
+       if (!nofifo && wf_queue_out_tail[pkt->dir]) {
+               wf_queue_out_tail[pkt->dir]->next = pkt;
+               wf_queue_out_tail[pkt->dir] = pkt;
+               //fprintf(stderr,"============= OUT =========== enqueued[%d]. 
Size now: %d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir]));
+               return;
+       }
        wf_queue_out[pkt->dir] = _pkt_enqueue(q, pkt);
        //fprintf(stderr,"============= OUT =========== enqueued[%d]. Size now: 
%d\n", pkt->dir, queue_size(wf_queue_out[pkt->dir]));
 }
@@ -295,74 +308,77 @@
 static int is_time_to_dequeue(int dir)
 {
        unsigned long long now = gettimeofdayms();
-       if (wf_queue_in[dir]) 
-               return (now >= wf_queue_in[dir]->dequeue_time);
+       if (wf_queue_out[dir])
+               return (now >= wf_queue_out[dir]->dequeue_time);
        else return 0;
 }
 
-static int process_queue_out(void)
+static int process_queue_in(void)
 {
-       static unsigned long long now, last_out[2] = {0ULL, 0ULL};
+       static unsigned long long now, last_in[2];
+       static unsigned long backlog[2] = {0U, 0U};
        struct wf_packet *pkt;
        int i, count[2] = {0}, old_count[2] = {0};
+ 
+       if (last_in[0] == 0) {
+               last_in[0] = gettimeofdayms();
+               last_in[1] = gettimeofdayms();
+       }
        do {
                old_count[0] = count[0];
                old_count[1] = count[1];
                for (i = 0; i < 2; i++) {
-                       double bandval;
-                       pkt = wf_queue_out[i];
+                       unsigned long bandval;
+                       pkt = wf_queue_in[i];
                        if (!pkt)
                                continue;
-                       bandval = compute_wirevalue(BAND,i);
+                       bandval = (unsigned long)compute_wirevalue(BAND,i);
                        if (bandval == 0) {
-                               writepacket(pkt);
-                               wf_queue_out[i] = pkt->next;
-                               queue_size_out[pkt->dir] -= pkt->size;
+                               wf_queue_in[i] = pkt->next;
+                               pkt_enqueue_out(pkt);
+                               queue_size_in[pkt->dir] -= pkt->size;
                                count[i] += pkt->size;
-                               last_out[i] = gettimeofdayms(); 
-                               free(pkt);
+                               last_in[i] = gettimeofdayms(); 
                        } else {
-                               now = gettimeofdayms();
-                               pkt->dequeue_time = (unsigned long long) 
((double)last_out[i] + (((double)(pkt->size + count[i])*1000) / bandval));
-                               if (now >= pkt->dequeue_time) {
-                                       writepacket(pkt);
-                                       wf_queue_out[i] = pkt->next;
-                                       queue_size_out[pkt->dir] -= pkt->size;
+                               unsigned long long now = gettimeofdayms();
+                               static unsigned long long delta;
+                               delta = now - last_in[pkt->dir];
+                                       
+                               backlog[pkt->dir] = (delta * bandval) / 1000U;
+                               while (pkt && (backlog[pkt->dir] > pkt->size)) {
+                                       wf_queue_in[i] = pkt->next;
+                                       pkt_enqueue_out(pkt);
+                                       queue_size_in[pkt->dir] -= pkt->size;
                                        count[i] += pkt->size;
-                                       last_out[i] = now; 
-                                       free(pkt);
+                                       last_in[i] = now; 
+                                       backlog[pkt->dir] -= pkt->size;
+                                       pkt = pkt->next;
                                }
                        }
                }
        } while (count[0] > old_count[0] || count[1] > old_count[1]);
-       /*
-       if (count[0] > 0)
-               fprintf(stderr,">>------------> OUT process queue: %d bytes 
transferred\n", count[0]);
-       if (count[1] > 0)
-               fprintf(stderr,"<------------<< OUT process queue: %d bytes 
transferred\n", count[1]);
-       */
        return count[0] + count[1];
 }
 
-static int process_queue_in(void)
+static int process_queue_out(void)
 {
        struct wf_packet *p;
-       int i, count = 0;
-       for (i = 0; i < 2; i++) {
-               if(is_time_to_dequeue(i)) {
-                       p = wf_queue_in[i];
-                       wf_queue_in[i] = p->next;
-                       queue_size_in[p->dir] -= p->size;
-                       pkt_enqueue_out(p);
-                       count++;
+       int i, count = 0, old_count;
+       do {
+               old_count = count;
+               for (i = 0; i < 2; i++) {
+                       if(is_time_to_dequeue(i)) {
+                               p = wf_queue_out[i];
+                               wf_queue_out[i] = p->next;
+                               queue_size_out[p->dir] -= p->size;
+                               writepacket(p);
+                               count++;
+                               free(p);
+                       }
                }
-       }
-       /*
-       if (count > 0)
-               fprintf(stderr,"process queue: %d packets transferred\n", 
count);
-       */
+       } while (old_count < count);
        return count;
-} 
+}
 
 static struct wf_packet 
 *pkt_discard(struct wf_packet *q, struct wf_packet *pkt)
@@ -648,6 +664,11 @@
                //fprintf(stderr, "PACKET LOSS ********************\n");
                return 0;
        }
+       static int max_q = 0;
+       if (!pkt->dir && queue_size_out[0] > max_q) { 
+               fprintf(stderr, "%llu, %lu\n", gettimeofdayms(), 
queue_size_out[0]);
+               max_q = queue_size_out[0];
+       }
        /* NOISE */
        if (max_wirevalue(markov_current,NOISE,pkt->dir) > 0) {
                double noiseval=compute_wirevalue(NOISE,pkt->dir);
@@ -699,12 +720,10 @@
        pkt->dequeue_time = 0U;
        if (max_wirevalue(markov_current,DELAY,pkt->dir) > 0) {
                double delval=compute_wirevalue(DELAY,pkt->dir);
-               unsigned long banddelay = time_in_queue(wf_queue_in[pkt->dir]);
-               delval=(delval >= 0)?delval+banddelay:banddelay;
                if (delval > 0) {
                        struct timeval tv;
                        unsigned long long now = gettimeofdayms();
-                       pkt->dequeue_time = now + delval - banddelay; 
+                       pkt->dequeue_time = now + delval; 
                }
        }
 }


This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.

------------------------------------------------------------------------------
Xperia(TM) PLAY
It's a major breakthrough. An authentic gaming
smartphone on the nation's most reliable network.
And it wants your games.
http://p.sf.net/sfu/verizon-sfdev
_______________________________________________
vde-users mailing list
vde-users@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/vde-users

Reply via email to