Naturally I forgot to attach the code, as I was rushing to get to lunch.
R.harper
p.s why do my messages end up as a reply to someone else's message in the archive?
_________________________________________________________________ Log p� MSN Messenger direkte fra nettet http://webmessenger.msn.com/
/* Flow based Qdisc scheduler */ #include <linux/config.h> #include <linux/module.h> #include <linux/types.h> #include <linux/kernel.h> #include <linux/string.h> #include <linux/mm.h> #include <linux/socket.h> #include <linux/sockios.h> #include <linux/in.h> #include <linux/errno.h> #include <linux/interrupt.h> #include <linux/if_ether.h> #include <linux/inet.h> #include <linux/netdevice.h> #include <linux/etherdevice.h> #include <linux/notifier.h> #include <net/ip.h> #include <net/route.h> #include <linux/skbuff.h> #include <net/sock.h> #include <net/pkt_sched.h>
#include <linux/jhash.h>
#include <linux/netfilter_ipv4/ip_conntrack.h>
#include <linux/netfilter_ipv4/ip_conntrack_protocol.h>
#include <linux/netfilter_ipv4/ip_conntrack_tcp.h>
#include <linux/netfilter_ipv4/ip_tables.h>
#define LATENCY 100000
#define LIMIT 50
#define HASH_SIZE 20
#define FLOW_LIFETIME (10*HZ)
/* Protects conntrack->proto.tcp*/
//static DECLARE_RWLOCK(tcp_lock);
struct flow_sched_data {
u32 latency;
u32 limit;
u32 qlen;
struct sk_buff_head qs[HASH_SIZE];
u32 is_used[HASH_SIZE];
psched_time_t last_pkt_sent[HASH_SIZE];
psched_time_t next_send_time[HASH_SIZE];
psched_time_t interval[HASH_SIZE];
struct timer_list remove_timer[HASH_SIZE];
struct timer_list timer;
struct sk_buff_head fasttrack;
};
static __inline__ u32 get_hash(struct sk_buff *skb)
{
if (skb->protocol == __constant_htons(ETH_P_IP) ){
struct iphdr *iph = skb->nh.iph;
if (iph->protocol == 6){
struct tcphdr *tcph = (void *)iph + (iph->ihl*4);
return jhash_3words(iph->saddr^iph->protocol, iph->daddr,
(tcph->dest << 16 | tcph->source), 0x543298ff);
}
else{
return jhash_3words(iph->saddr, iph->daddr, iph->protocol, 0x543298ff);
}
}
return 0;
}
static __inline__ short flow_hash(struct sk_buff *skb, struct Qdisc *sch)
{
int i = 0;
u32 hash;
short index;
struct ip_conntrack *ct;
enum ip_conntrack_info ctinfo;
struct flow_sched_data *q = qdisc_priv(sch);
ct = ip_conntrack_get(skb, &ctinfo);
//if (ct->proto.tcp.state == TCP_CONNTRACK_ESTABLISHED)
hash = get_hash(skb);
index = hash % HASH_SIZE;
if (q->is_used[index] == hash){
//printk("Correct flow found, hash: %u, index: %i \n", hash, index);
//Found correct flow
return index;
}
else if (q->is_used[index] == 0){
psched_time_t next_send;
PSCHED_GET_TIME(next_send);
//bucket unused, lets use this one
printk("New flow, hash: %u, index: %i \n", hash, index);
q->is_used[index] = hash;
//seting the dequeue time!
//READ_LOCK(&tcp_lock);
//q->next_send_time[index] = PSCHED_TADD(next_send, PSCHED_JIFFIE2US(ct->proto.tcp.rate));
//READ_UNLOCK(&tcp_lock);
q->next_send_time[index] = PSCHED_TADD(next_send, q->interval[index]);
return index;
}
else{
//Must search the whole table to see if this is an
//established connection,
for(i = 0; i < HASH_SIZE; i++){
if (q->is_used[(index+i)%HASH_SIZE] == hash){
//found the right slot in the table;
return (index+i)%HASH_SIZE;
}
}
//Ok didn't find an established connection
//This *must* be a new connection
for(i = 0; i < HASH_SIZE; i++){
if (q->is_used[(index+i)%HASH_SIZE] == 0){
psched_time_t next_send;
PSCHED_GET_TIME(next_send);
//found unused slot in the hash table.
q->is_used[(index+i)%HASH_SIZE] = hash;
//READ_LOCK(&tcp_lock);
//q->next_send_time[(index+i)%HASH_SIZE] = PSCHED_TADD(next_send, PSCHED_JIFFIE2US(ct->proto.tcp.rate));
//READ_UNLOCK(&tcp_lock);
q->next_send_time[(index+i)%HASH_SIZE] = PSCHED_TADD(next_send, q->interval[(index+i)%HASH_SIZE]);
return (index+i)%HASH_SIZE;
}
}
//Nothing found, strange!
//Must be an error
printk(KERN_WARNING "No hash bucket found!\n");
return -1;
}
}
static __inline__ int flow_is_valid(struct sk_buff *skb){
struct iphdr *iph = skb->nh.iph;
enum ip_conntrack_info ctinfo;
struct tcphdr *tcph = (void *)iph + (iph->ihl*4);
if (skb->protocol != __constant_htons(ETH_P_IP) )
return 0;
if (iph->protocol != IPPROTO_TCP)
return 0;
ip_conntrack_get(skb, &ctinfo);
if (!tcph->ack)
return 0;
if (ctinfo != IP_CT_ESTABLISHED)
return 0;
return 1;
}
static int flow_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
struct flow_sched_data *q = qdisc_priv(sch);
short hash;
if (flow_is_valid(skb) == 1)
hash = flow_hash(skb, sch);
else
hash = 0;
if (!skb->nfmark)
goto drop;
if (q->qlen && sch->q.qlen >= q->qlen){
printk("Dropping packet, queue full\n");
goto drop;
}
if (hash == -1)
goto drop;
//printk("flow_enqueue hash %u skb=%p @%lu\n", hash, skb, jiffies);
//printk("qs len pre: %i \n", q->qs[hash].qlen);
__skb_queue_tail(&q->qs[hash], skb);
//printk("qs len post: %i \n", q->qs[hash].qlen);
q->remove_timer[hash].expires = jiffies + FLOW_LIFETIME;
add_timer(&q->remove_timer[hash]);
if (++sch->q.qlen < q->limit-1){
sch->stats.bytes += skb->len;
sch->stats.packets++;
return NET_XMIT_SUCCESS;
}
drop:
sch->stats.drops++;
kfree_skb(skb);
return NET_XMIT_CN;
}
static unsigned int flow_drop(struct Qdisc *sch)
{
struct flow_sched_data *q = qdisc_priv(sch);
unsigned int len;
struct sk_buff *skb;
int i = 0;
//just delete from the first flow found
while (q->qs[i].qlen == 0 && i < HASH_SIZE)
i++;
if (i == HASH_SIZE)
//nothing found to delete
return 0;
else{
skb = q->qs[i].prev;
len = skb->len;
__skb_unlink(skb, &q->qs[i]);
kfree_skb(skb);
sch->q.qlen--;
sch->stats.drops++;
return len;
}
}
static int flow_requeue(struct sk_buff *skb, struct Qdisc *sch)
{
struct flow_sched_data *q = qdisc_priv(sch);
short hash = flow_hash(skb, sch);
__skb_queue_head(&q->qs[hash], skb);
if (++sch->q.qlen < q->limit-1)
return NET_XMIT_SUCCESS;
flow_drop(sch);
return NET_XMIT_CN;
}
static __inline__ struct sk_buff *
is_time_to_send(struct Qdisc *sch, psched_time_t *now, struct flow_sched_data *q, int *i, long *min)
{
long diff;
for (; *i < HASH_SIZE; (*i)++){
diff = PSCHED_TDIFF(q->next_send_time[*i], *now);
if (q->qs[*i].qlen >0){
//printk("FOUND used queue %i \n", *i);
//printk("DIFF: %li\n", diff);
if (diff <= 0)
//printk("Send NOW %i\n", *i);
return __skb_dequeue(&q->qs[*i]);
else{
if (diff > 0 && (diff < *min || *min ==-1)){
//printk("Setting min %li for i=%i \n", diff, *i);
*min = diff;
}
}
}
}
return NULL;
}
static struct sk_buff *flow_dequeue(struct Qdisc *sch)
{
struct flow_sched_data *q = qdisc_priv(sch);
struct sk_buff *skb;
struct ip_conntrack *ct;
enum ip_conntrack_info ctinfo;
int i = 0;
long min = -1;
psched_time_t now;
PSCHED_GET_TIME(now);
while ( (skb = is_time_to_send(sch, &now, q, &i, &min) ) != NULL){
//Get the conntrack info
ct = ip_conntrack_get(skb, &ctinfo);
//Decrease Qdisc counter
sch->q.qlen--;
//Turn flag OFF
sch->flags &= ~TCQ_F_THROTTLED;
//printk("flow_dequeue hash=%i skb=%p @%lu\n",i, skb, jiffies);
//printk("RATE: %u\n", ct->proto.tcp.rate);
//Update next send time
//q->next_send_time[i] = PSCHED_TADD(now, PSCHED_JIFFIE2US(ct->proto.tcp.rate));
q->next_send_time[i] = PSCHED_TADD(now, q->interval[i]);
q->last_pkt_sent[i] = now;
//printk(" Next send time is %lu\n", q->next_send_time[i]);
return skb;
}
if (skb == NULL){
long delay = PSCHED_US2JIFFIE(min);
if (min == -1){
//noting to send
printk(" Noting to send\n");
return NULL;
}
if (delay <= 0)
delay = 1;
mod_timer(&q->timer, jiffies+delay);
//printk(" Not time, sch->q.qlen= %i and limit %i\n", sch->q.qlen, q->limit);
//printk(" delaying jiffies = %li\n", delay);
//Turn flag ON
sch->flags |= TCQ_F_THROTTLED;
}
return NULL;
}
static void flow_reset(struct Qdisc *sch)
{
struct flow_sched_data *q = qdisc_priv(sch);
struct sk_buff *skb;
while((skb = flow_dequeue(sch)) != NULL)
kfree_skb(skb);
sch->q.qlen = 0;
sch->flags &= ~TCQ_F_THROTTLED;
del_timer(&q->timer);
}
static void flow_timer(unsigned long arg)
{
struct Qdisc *sch = (struct Qdisc *)arg;
sch->flags &= ~TCQ_F_THROTTLED;
netif_schedule(sch->dev);
}
static __inline__ void flow_timer_del(unsigned long p)
{
struct flow_sched_data *q = (struct flow_sched_data *) p;
psched_time_t now;
psched_tdiff_t diff;
int i = 0;
printk("TIMER CALLED\n");
printk("sizeof psched_time_t %i\n", sizeof(psched_time_t));
//Some flow must be expired since we get called,
//must find it
PSCHED_GET_TIME(now);
for(i = 0; i < HASH_SIZE; i++)
if (q->is_used[i] != 0 || i == 0){
// (now - last_sent) > FLOW_LIFETIME
//printk("Now %llu, last sent %llu \n", now, q->last_pkt_sent[i]);
//printk("PSCEHED_TDIFF %li, %d\n", PSCHED_TDIFF(now, q->last_pkt_sent[i]), FLOW_LIFETIME);
//printk("us2jiffies %lu\n", PSCHED_US2JIFFIE(PSCHED_TDIFF(now, q->last_pkt_sent[i])));
diff = PSCHED_TDIFF(now, q->last_pkt_sent[i]);
if ( PSCHED_US2JIFFIE(diff) >= FLOW_LIFETIME){
printk("Deleteing flow %i \n", i);
q->is_used[i] = 0;
}
}
}
static int flow_init(struct Qdisc *sch, struct rtattr *opt)
{
struct flow_sched_data *q = qdisc_priv(sch);
int i;
psched_time_t now;
printk("----Initialising sch_flow\n----");
PSCHED_GET_TIME(now);
for (i=0; i < HASH_SIZE; i++){
skb_queue_head_init(&q->qs[i]);
q->is_used[i] = 0;
q->last_pkt_sent[i] = now;
//constant delay of 200ms
q->interval[i] = PSCHED_JIFFIE2US(HZ/5);
init_timer(&q->remove_timer[i]);
q->remove_timer[i].function = flow_timer_del;
q->remove_timer[i].data = (unsigned long) q;
}
init_timer(&q->timer);
q->timer.function = flow_timer;
q->timer.data = (unsigned long) sch;
return 0;
}
static void flow_destroy(struct Qdisc *sch)
{
struct flow_sched_data *q = qdisc_priv(sch);
int i;
printk("----Destroy called\n");
for (i = 0; i < HASH_SIZE; i++)
del_timer(&q->remove_timer[i]);
del_timer(&q->timer);
}
static int flow_dump(struct Qdisc *sch, struct sk_buff *skb)
{
return skb->len;
}
static struct Qdisc_ops flow_qdisc_ops = {
.id = "flow",
.priv_size = sizeof(struct flow_sched_data),
.enqueue = flow_enqueue,
.dequeue = flow_dequeue,
.requeue = flow_requeue,
.drop = flow_drop,
.init = flow_init,
.reset = flow_reset,
.destroy = flow_destroy,
.change = flow_init,
.dump = flow_dump,
.owner = THIS_MODULE,
};
static int __init flow_module_init(void)
{
return register_qdisc(&flow_qdisc_ops);
}
static void __exit flow_module_exit(void)
{
unregister_qdisc(&flow_qdisc_ops);
}
module_init(flow_module_init)
module_exit(flow_module_exit)
MODULE_LICENSE("GPL");
_______________________________________________ LARTC mailing list [email protected] http://mailman.ds9a.nl/cgi-bin/mailman/listinfo/lartc
