Nework asynchronous IO.

This project was evolved from reeiving zero-copy support [1].

Network AIO is based on kevent and works as usual 
kevent storage on top of inode. 
When new socket is created it is associated with that inode 
and when some activity is detected appropriate notifications 
are generated and kevent_naio_callback() is called.
When new kevent is being registered, network AIO ->enqueue() 
callback simply marks itself like usual socket event watcher. 
It also locks physical userspace pages in memory and stores 
appropriate pointers in private kevent structure.
Network AIO callback gets pointers to userspace pages and 
tries to copy data from receiving skb queue into them using protocol 
specific callback. This callback is very similar to ->recvmsg(), 
so they could share a lot in future.

Interface is a bit horrible, but it is the simplest one.

I've run several benchmarks of asynchronous receiving versus stock recv().

Hardware.
Receiving side: Xeon 2.4 Ghz, HT disabled, 1Gb RAM, 
        1Gbps Intel 8254IPI (PCI-X 133Mhz slot) e1000 adapter.
Sending side: AMD64 3500+ 2.2 Ghz, 1Gb RAM, 
        1Gbps RealTek 8169 adapter integrated into nVidia nForce3 chipset (MSI 
K8N Neo2).
Connection: D-Link DGS-1216T gigabit switch.

Receiving software (naio_recv.c) can be found in archive [2]. 
Sending software is a simple sendfile() based server.
Receiving side runs 2.6.15-rc7-event FC3 system. Default settings.
Sending side runs 2.6.15-1.1830_FC4 FC4 system. Default settings.

Results.
Client receives 1Gb of data on each of 8 runs (4 asynchronous receiving and 4 
synchronous).
Each part of 4 graphs contains speed of both types and CPU usage during test.
Performance reported by netperf-2.3 is about 400Mbit/sec.

Picture attached.

[1]. http://tservice.net.ru/~s0mbre/old/?section=projects&item=recv_zero_copy
[2]. http://tservice.net.ru/~s0mbre/archive/kevent/

Signed-off-by: Evgeniy Polyakov <[EMAIL PROTECTED]>

diff --git a/arch/i386/kernel/syscall_table.S b/arch/i386/kernel/syscall_table.S
index 9b21a31..30113b7 100644
--- a/arch/i386/kernel/syscall_table.S
+++ b/arch/i386/kernel/syscall_table.S
@@ -294,3 +294,4 @@ ENTRY(sys_call_table)
        .long sys_inotify_init
        .long sys_inotify_add_watch
        .long sys_inotify_rm_watch
+       .long sys_aio_recv
diff --git a/arch/x86_64/ia32/ia32entry.S b/arch/x86_64/ia32/ia32entry.S
index e0eb0c7..93c4fd4 100644
--- a/arch/x86_64/ia32/ia32entry.S
+++ b/arch/x86_64/ia32/ia32entry.S
@@ -643,6 +643,7 @@ ia32_sys_call_table:
        .quad sys_inotify_init
        .quad sys_inotify_add_watch
        .quad sys_inotify_rm_watch
+       .quad sys_aio_recv
 ia32_syscall_end:              
        .rept IA32_NR_syscalls-(ia32_syscall_end-ia32_sys_call_table)/8
                .quad ni_syscall
diff --git a/include/asm-i386/socket.h b/include/asm-i386/socket.h
index 802ae76..3473f5c 100644
--- a/include/asm-i386/socket.h
+++ b/include/asm-i386/socket.h
@@ -49,4 +49,6 @@
 
 #define SO_PEERSEC             31
 
+#define SO_ASYNC_SOCK          34
+
 #endif /* _ASM_SOCKET_H */
diff --git a/include/asm-i386/unistd.h b/include/asm-i386/unistd.h
index 0f92e78..0207851 100644
--- a/include/asm-i386/unistd.h
+++ b/include/asm-i386/unistd.h
@@ -299,8 +299,9 @@
 #define __NR_inotify_init      291
 #define __NR_inotify_add_watch 292
 #define __NR_inotify_rm_watch  293
+#define __NR_aio_recv          294
 
-#define NR_syscalls 294
+#define NR_syscalls 295
 
 /*
  * user-visible error numbers are in the range -1 - -128: see
diff --git a/include/asm-x86_64/ia32_unistd.h b/include/asm-x86_64/ia32_unistd.h
index d5166ec..9fba3a0 100644
--- a/include/asm-x86_64/ia32_unistd.h
+++ b/include/asm-x86_64/ia32_unistd.h
@@ -299,7 +299,8 @@
 #define __NR_ia32_inotify_init         291
 #define __NR_ia32_inotify_add_watch    292
 #define __NR_ia32_inotify_rm_watch     293
+#define __NR_ia32_aio_recv             294
 
-#define IA32_NR_syscalls 294   /* must be > than biggest syscall! */
+#define IA32_NR_syscalls 295   /* must be > than biggest syscall! */
 
 #endif /* _ASM_X86_64_IA32_UNISTD_H_ */
diff --git a/include/asm-x86_64/socket.h b/include/asm-x86_64/socket.h
index f2cdbea..1f31f86 100644
--- a/include/asm-x86_64/socket.h
+++ b/include/asm-x86_64/socket.h
@@ -49,4 +49,6 @@
 
 #define SO_PEERSEC             31
 
+#define SO_ASYNC_SOCK          34
+
 #endif /* _ASM_SOCKET_H */
diff --git a/include/asm-x86_64/unistd.h b/include/asm-x86_64/unistd.h
index 2c42150..0d7ee3b 100644
--- a/include/asm-x86_64/unistd.h
+++ b/include/asm-x86_64/unistd.h
@@ -571,8 +571,10 @@ __SYSCALL(__NR_inotify_init, sys_inotify
 __SYSCALL(__NR_inotify_add_watch, sys_inotify_add_watch)
 #define __NR_inotify_rm_watch  255
 __SYSCALL(__NR_inotify_rm_watch, sys_inotify_rm_watch)
+#define __NR_aio_recv          256
+__SYSCALL(__NR_aio_recv, sys_aio_recv)
 
-#define __NR_syscall_max __NR_inotify_rm_watch
+#define __NR_syscall_max __NR_aio_recv
 #ifndef __NO_STUBS
 
 /* user-visible error numbers are in the range -1 - -4095 */
diff --git a/include/linux/kevent.h b/include/linux/kevent.h
index 3e164d1..8cf83dc 100644
--- a/include/linux/kevent.h
+++ b/include/linux/kevent.h
@@ -42,6 +42,7 @@ enum {
        KEVENT_INODE,
        KEVENT_TIMER,
        KEVENT_POLL,
+       KEVENT_NAIO,
 
        KEVENT_MAX,
 };
@@ -59,7 +60,7 @@ enum {
 };
 
 /*
- * Socket events.
+ * Socket/network asynchronous IO events.
  */
 enum {
        KEVENT_SOCKET_RECV      = 0x1,
@@ -109,7 +110,10 @@ struct ukevent
        __u32                   req_flags;              /* Per-event request 
flags */
        __u32                   ret_flags;              /* Per-event return 
flags */
        __u32                   ret_data[2];            /* Event return data. 
Event originator fills it with anything it likes. */
-       __u32                   user[2];                /* User's data. It is 
not used, just copied to/from user. */
+       union {
+               __u32           user[2];                /* User's data. It is 
not used, just copied to/from user. */
+               void            *ptr;
+       };
 };
 
 enum {
@@ -159,7 +163,7 @@ struct kevent
 
        kevent_callback_t       callback;               /* Is called each time 
new event has been caught. */
        kevent_callback_t       enqueue;                /* Is called each time 
new event is queued. */
-       kevent_callback_t       dequeue;                /* Is called each time 
new event is dequeued. */
+       kevent_callback_t       dequeue;                /* Is called each time 
event is dequeued. */
 
        void                    *priv;                  /* Private data for 
different storages. 
                                                         * poll()/select 
storage has a list of wait_queue_t containers 
@@ -212,10 +216,12 @@ void kevent_storage_dequeue(struct keven
 
 int kevent_break(struct kevent *k);
 int kevent_init(struct kevent *k);
+
 int kevent_init_socket(struct kevent *k);
 int kevent_init_inode(struct kevent *k);
 int kevent_init_timer(struct kevent *k);
 int kevent_init_poll(struct kevent *k);
+int kevent_init_naio(struct kevent *k);
 
 void kevent_storage_ready(struct kevent_storage *st, kevent_callback_t 
ready_callback, u32 event);
 int kevent_storage_init(__u32 type, __u32 event, void *origin, struct 
kevent_storage *st);
@@ -238,6 +244,8 @@ static inline void kevent_inode_remove(s
 #endif /* CONFIG_KEVENT_INODE */
 #ifdef CONFIG_KEVENT_SOCKET
 void kevent_socket_notify(struct sock *sock, u32 event);
+int kevent_socket_dequeue(struct kevent *k);
+int kevent_socket_enqueue(struct kevent *k);
 #else
 static inline void kevent_socket_notify(struct sock *sock, u32 event)
 {
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index 8c5d600..8dbd67d 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -1232,6 +1232,8 @@ extern struct sk_buff *skb_recv_datagram
                                         int noblock, int *err);
 extern unsigned int    datagram_poll(struct file *file, struct socket *sock,
                                     struct poll_table_struct *wait);
+extern int            skb_copy_datagram(const struct sk_buff *from, 
+                                        int offset, void *dst, int size);
 extern int            skb_copy_datagram_iovec(const struct sk_buff *from,
                                               int offset, struct iovec *to,
                                               int size);
diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h
index c7007b1..e729fe1 100644
--- a/include/linux/syscalls.h
+++ b/include/linux/syscalls.h
@@ -512,4 +512,5 @@ asmlinkage long sys_ioprio_get(int which
 asmlinkage long sys_set_mempolicy(int mode, unsigned long __user *nmask,
                                        unsigned long maxnode);
 
+asmlinkage long sys_aio_recv(int fd, void __user *buf, size_t size, unsigned 
flags);
 #endif
diff --git a/include/net/sock.h b/include/net/sock.h
index e7eaaad..c086188 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -213,6 +213,7 @@ struct sock {
        int                     sk_route_caps;
        unsigned long           sk_flags;
        unsigned long           sk_lingertime;
+       unsigned long           sk_async_sock;
        /*
         * The backlog queue is special, it is always used with
         * the per-socket spinlock held and requires low latency
@@ -466,7 +467,6 @@ static inline void sk_stream_set_owner_r
        skb->destructor = sk_stream_rfree;
        atomic_add(skb->truesize, &sk->sk_rmem_alloc);
        sk->sk_forward_alloc -= skb->truesize;
-       kevent_socket_notify(sk, KEVENT_SOCKET_RECV);
 }
 
 static inline void sk_stream_free_skb(struct sock *sk, struct sk_buff *skb)
@@ -550,6 +550,9 @@ struct proto {
 
        int                     (*backlog_rcv) (struct sock *sk, 
                                                struct sk_buff *skb);
+       
+       int                     (*async_recv) (struct sock *sk, 
+                                               void *dst, size_t size);
 
        /* Keeping track of sk's, looking them up, and port selection methods. 
*/
        void                    (*hash)(struct sock *sk);
diff --git a/include/net/tcp.h b/include/net/tcp.h
index 77626a1..e14e48d 100644
--- a/include/net/tcp.h
+++ b/include/net/tcp.h
@@ -392,6 +392,7 @@ extern int                  tcp_setsockopt(struct sock 
                                               int optname, char __user 
*optval, 
                                               int optlen);
 extern void                    tcp_set_keepalive(struct sock *sk, int val);
+extern int                     tcp_async_recv(struct sock *sk, void *dst, 
size_t size);
 extern int                     tcp_recvmsg(struct kiocb *iocb, struct sock *sk,
                                            struct msghdr *msg,
                                            size_t len, int nonblock, 
diff --git a/kernel/kevent/Kconfig b/kernel/kevent/Kconfig
index 5aae2ef..a52a86f 100644
--- a/kernel/kevent/Kconfig
+++ b/kernel/kevent/Kconfig
@@ -31,3 +31,9 @@ config KEVENT_POLL
        depends on KEVENT
        help
          This option allows to use kevent subsystem for poll()/select() 
notifications.
+
+config KEVENT_NAIO
+       bool "Network asynchronous IO"
+       depends on KEVENT_SOCKET
+       help
+         This option enables kevent based network asynchronous IO subsystem.
diff --git a/kernel/kevent/Makefile b/kernel/kevent/Makefile
index 4609205..2bc7135 100644
--- a/kernel/kevent/Makefile
+++ b/kernel/kevent/Makefile
@@ -3,3 +3,4 @@ obj-$(CONFIG_KEVENT_SOCKET) += kevent_so
 obj-$(CONFIG_KEVENT_INODE) += kevent_inode.o
 obj-$(CONFIG_KEVENT_TIMER) += kevent_timer.o
 obj-$(CONFIG_KEVENT_POLL) += kevent_poll.o
+obj-$(CONFIG_KEVENT_NAIO) += kevent_naio.o
diff --git a/kernel/kevent/kevent.c b/kernel/kevent/kevent.c
index 61a442f..ef08aa7 100644
--- a/kernel/kevent/kevent.c
+++ b/kernel/kevent/kevent.c
@@ -87,6 +87,9 @@ int kevent_init(struct kevent *k)
                return -E2BIG;
        
        switch (k->event.type) {
+               case KEVENT_NAIO:
+                       err = kevent_init_naio(k);
+                       break;
                case KEVENT_SOCKET:
                        err = kevent_init_socket(k);
                        break;
@@ -116,9 +119,10 @@ static inline u32 kevent_set_event(struc
        u32 ev = event & k->event.event;
 
        st->event &= ~ev;
+#if 0
        if (ev)
                k->event.ret_data[1] = ev;
-
+#endif
        return ev;
 }
 
@@ -165,40 +169,42 @@ void kevent_storage_dequeue(struct keven
        unsigned long flags;
 
        spin_lock_irqsave(&st->lock, flags);
-       list_del(&k->storage_entry);
-       st->qlen--;
+       if (k->storage_entry.next != LIST_POISON1) {
+               list_del(&k->storage_entry);
+               st->qlen--;
+       }
        spin_unlock_irqrestore(&st->lock, flags);
 }
 
 static void __kevent_requeue(struct kevent *k, u32 event)
 {
-       int err, broken;
+       int err, rem = 0;
+               
+       wake_up(&k->user->wait);
 
        err = k->callback(k);
-       if (err < 0)
-               kevent_break(k);
-       
-       spin_lock(&k->lock);
-       broken = (k->event.ret_flags & KEVENT_RET_BROKEN);
-       spin_unlock(&k->lock);
 
-       if (err || broken) {
-               spin_lock(&k->lock);
+       spin_lock(&k->lock);
+       if (err > 0) {
                k->event.ret_flags |= KEVENT_RET_DONE;
-               if (event & k->event.event)
-                       k->event.ret_data[0] = event & k->event.event;
-               spin_unlock(&k->lock);
-
-               list_del(&k->storage_entry);
-               list_add_tail(&k->storage_entry, &k->st->list);
+       } else if (err < 0) {
+               k->event.ret_flags |= KEVENT_RET_BROKEN;
+               k->event.ret_flags |= KEVENT_RET_DONE;
+       }
+       rem = (k->event.req_flags & KEVENT_REQ_ONESHOT);
+       spin_unlock(&k->lock);
 
+       if (err) {
+               if (rem) {
+                       list_del(&k->storage_entry);
+                       k->st->qlen--;
+               }
                spin_lock(&k->user->ready_lock);
                if (k->ready_entry.next == LIST_POISON1) {
                        list_add_tail(&k->ready_entry, &k->user->ready_list);
                        k->user->ready_num++;
                }
                spin_unlock(&k->user->ready_lock);
-               wake_up(&k->user->wait);
        }
 }
 
@@ -218,11 +224,10 @@ void kevent_storage_ready(struct kevent_
 {
        unsigned long flags;
        struct kevent *k, *n;
-       u32 ev;
        unsigned int qlen;
+       u32 ev = 0;
 
-       spin_lock_irqsave(&st->lock, flags);
-       
+       spin_lock_bh(&st->lock);
        st->event |= event;
        qlen = st->qlen;
        
@@ -234,19 +239,15 @@ void kevent_storage_ready(struct kevent_
                        if (ready_callback)
                                ready_callback(k);
 
-                       spin_lock(&k->lock);
-                       ev = (k->event.event & event);
-                       if (!ev) {
-                               spin_unlock(&k->lock);
-                               continue;
-                       }
-                       kevent_set_event(st, k, event);
-                       spin_unlock(&k->lock);
+                       ev |= (event & k->event.event);
 
-                       __kevent_requeue(k, event);
+                       if (event & k->event.event)
+                               __kevent_requeue(k, event);
                }
        }
-       spin_unlock_irqrestore(&st->lock, flags);
+       
+       st->event &= ~ev;
+       spin_unlock_bh(&st->lock);
 }
 
 int kevent_storage_init(__u32 type, __u32 event, void *origin, struct 
kevent_storage *st)
diff --git a/kernel/kevent/kevent_init.c b/kernel/kevent/kevent_init.c
index 2a75c40..74659df 100644
--- a/kernel/kevent/kevent_init.c
+++ b/kernel/kevent/kevent_init.c
@@ -33,7 +33,6 @@ int kevent_break(struct kevent *k)
        spin_lock_irqsave(&k->lock, flags);
        k->event.ret_flags |= KEVENT_RET_BROKEN;
        spin_unlock_irqrestore(&k->lock, flags);
-       printk("%s: k=%p.\n", __func__, k);
        return 0;
 }
 
@@ -68,3 +67,11 @@ int kevent_init_poll(struct kevent *k)
        return -ENODEV;
 }
 #endif
+
+#ifndef CONFIG_KEVENT_NAIO
+int kevent_init_naio(struct kevent *k)
+{
+       kevent_break(k);
+       return -ENODEV;
+}
+#endif
diff --git a/kernel/kevent/kevent_naio.c b/kernel/kevent/kevent_naio.c
new file mode 100644
index 0000000..477cbb1
--- /dev/null
+++ b/kernel/kevent/kevent_naio.c
@@ -0,0 +1,182 @@
+/*
+ *     kevent_naio.c
+ * 
+ * 2006 Copyright (c) Evgeniy Polyakov <[EMAIL PROTECTED]>
+ * All rights reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/types.h>
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/file.h>
+#include <linux/pagemap.h>
+#include <linux/kevent.h>
+
+#include <net/sock.h>
+#include <net/tcp_states.h>
+
+static int kevent_naio_enqueue(struct kevent *k);
+static int kevent_naio_dequeue(struct kevent *k);
+static int kevent_naio_callback(struct kevent *k);
+
+asmlinkage long sys_aio_recv(int fd, void __user *buf, size_t size, unsigned 
flags)
+{
+       return 0;
+}
+
+static int kevent_naio_enqueue(struct kevent *k)
+{
+       int err, i;
+       struct page **page;
+       void *addr;
+       unsigned int size = k->event.id.raw[1];
+       int num = size/PAGE_SIZE + 1;
+       struct file *file;
+       struct sock *sk;
+       int fput_needed;
+
+       file = fget_light(k->event.id.raw[0], &fput_needed);
+       if (!file)
+               return -ENODEV;
+
+       err = -EINVAL;
+       if (!file->f_dentry || !file->f_dentry->d_inode)
+               goto err_out_fput;
+
+       sk = SOCKET_I(file->f_dentry->d_inode)->sk;
+
+       err = -ESOCKTNOSUPPORT;
+       if (!sk || !sk->sk_prot->async_recv)
+               goto err_out_fput;
+       
+       page = kmalloc(sizeof(struct page *) * num, GFP_KERNEL);
+       if (!page)
+               return -ENOMEM;
+
+       addr = k->event.ptr;
+
+       err = get_user_pages(current, current->mm, (unsigned long)addr, num, 1, 
0, page, NULL);
+       if (err <= 0)
+               goto err_out_free;
+       num = err;
+
+       k->event.ret_data[0] = num;
+       k->event.ret_data[1] = offset_in_page(k->event.ptr);
+       k->priv = page;
+
+       err = kevent_socket_enqueue(k);
+       if (err < 0)
+               goto err_out_put_pages;
+
+       err = kevent_naio_callback(k);
+       if (err < 0)
+               goto err_out_put_pages;
+
+       return err;
+
+err_out_put_pages:
+       for (i=0; i<num; ++i)
+               page_cache_release(page[i]);
+err_out_free:
+       kfree(page);
+err_out_fput:
+       fput_light(file, fput_needed);
+
+       return err;
+}
+
+static int kevent_naio_dequeue(struct kevent *k)
+{
+       int err, i, num;
+       struct page **page = k->priv;
+
+       num = k->event.ret_data[0];
+
+       err = kevent_socket_dequeue(k);
+
+       for (i=0; i<num; ++i)
+               page_cache_release(page[i]);
+
+       kfree(k->priv);
+       k->priv = NULL;
+
+       return err;
+}
+
+static int kevent_naio_callback(struct kevent *k)
+{
+       struct inode *inode = k->st->origin;
+       struct sock *sk = SOCKET_I(inode)->sk;
+       unsigned int size = k->event.id.raw[1];
+       unsigned int off = k->event.ret_data[1];
+       struct page **pages = k->priv, *page;
+       int ready = 0, num = 0, err = 0;
+       void *ptr, *optr;
+       unsigned int len;
+
+       while (size) {
+               num = off / PAGE_SIZE;
+
+               page = pages[num];
+
+               optr = ptr = kmap_atomic(page, KM_IRQ0);
+               if (!ptr) {
+                       err = -ENOMEM;
+                       break;
+               }
+
+               ptr += off % PAGE_SIZE;
+               len = min_t(unsigned int, PAGE_SIZE - (ptr - optr), size);
+
+               /*
+                * sk_prot->async_recv() can return either number of bytes read,
+                * or negative error value, or zero if socket is closed.
+                */
+               err = sk->sk_prot->async_recv(sk, ptr, len);
+
+               kunmap_atomic(optr, KM_IRQ0);
+
+               if (err > 0) {
+                       num++;
+                       size -= err;
+                       off += err;
+               }
+
+               if (err != len)
+                       break;
+       }
+
+       k->event.ret_data[1] = off;
+       k->event.id.raw[1] = size;
+
+       if (err == 0 || (err < 0 && err != -EAGAIN))
+               ready = -1;
+
+       if (!size)
+               ready = 1;
+
+       return ready;
+}
+
+int kevent_init_naio(struct kevent *k)
+{
+       k->enqueue = &kevent_naio_enqueue;
+       k->dequeue = &kevent_naio_dequeue;
+       k->callback = &kevent_naio_callback;
+       return 0;
+}
diff --git a/kernel/kevent/kevent_socket.c b/kernel/kevent/kevent_socket.c
index 1606eb6..fd74f3b 100644
--- a/kernel/kevent/kevent_socket.c
+++ b/kernel/kevent/kevent_socket.c
@@ -59,7 +59,7 @@ static int kevent_socket_callback(struct
        return 0;
 }
 
-static int kevent_socket_enqueue(struct kevent *k)
+int kevent_socket_enqueue(struct kevent *k)
 {
        struct file *file;
        struct inode *inode;
@@ -93,7 +93,7 @@ err_out_fput:
        return err;
 }
 
-static int kevent_socket_dequeue(struct kevent *k)
+int kevent_socket_dequeue(struct kevent *k)
 {
        struct inode *inode = k->st->origin;
 
diff --git a/kernel/kevent/kevent_user.c b/kernel/kevent/kevent_user.c
index 314ad18..ae776c5 100644
--- a/kernel/kevent/kevent_user.c
+++ b/kernel/kevent/kevent_user.c
@@ -252,6 +252,7 @@ static int kevent_user_ctl_modify(struct
                        spin_lock_irqsave(&k->lock, flags);
                        k->event.event = uk.event;
                        k->event.req_flags = uk.req_flags;
+                       k->event.ret_flags = 0;
                        spin_unlock_irqrestore(&k->lock, flags);
                        kevent_requeue(k);
                } else
@@ -343,6 +344,10 @@ static int kevent_user_ctl_add(struct ke
                        break;
                }
 
+               k->event.ret_flags = 0;
+               k->event.ret_data[0] = 0;
+               k->event.ret_data[1] = 0;
+
                arg += sizeof(struct ukevent);
 
                err = kevent_init(k);
@@ -362,6 +367,7 @@ static int kevent_user_ctl_add(struct ke
                        if (err < 0)
                                kevent_free(k);
                        num++;
+                       k->event.ret_flags = 0;
                }  
                
                if (err >= 0) {
diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c
index 1ab2370..fbc2939 100644
--- a/kernel/sys_ni.c
+++ b/kernel/sys_ni.c
@@ -90,3 +90,5 @@ cond_syscall(sys_pciconfig_iobase);
 cond_syscall(sys32_ipc);
 cond_syscall(sys32_sysctl);
 cond_syscall(ppc_rtas);
+
+cond_syscall(sys_aio_recv);
diff --git a/net/core/datagram.c b/net/core/datagram.c
index 1bcfef5..b2f19a7 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -200,6 +200,60 @@ void skb_free_datagram(struct sock *sk, 
 }
 
 /**
+ *     skb_copy_datagram - Copy a datagram.
+ *     @skb: buffer to copy
+ *     @offset: offset in the buffer to start copying from
+ *     @to: pointer to copy to
+ *     @len: amount of data to copy from buffer to iovec
+ */
+int skb_copy_datagram(const struct sk_buff *skb, int offset,
+                           void *to, int len)
+{
+       int i, fraglen, end = 0;
+       struct sk_buff *next = skb_shinfo(skb)->frag_list;
+
+       if (!len)
+               return 0;
+
+next_skb:
+       fraglen = skb_headlen(skb);
+       i = -1;
+
+       while (1) {
+               int start = end;
+
+               if ((end += fraglen) > offset) {
+                       int copy = end - offset, o = offset - start;
+
+                       if (copy > len)
+                               copy = len;
+                       if (i == -1)
+                               memcpy(to, skb->data + o, copy);
+                       else {
+                               skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
+                               struct page *page = frag->page;
+                               void *p = kmap(page) + frag->page_offset + o;
+                               memcpy(to, p, copy);
+                               kunmap(page);
+                       }
+                       if (!(len -= copy))
+                               return 0;
+                       offset += copy;
+               }
+               if (++i >= skb_shinfo(skb)->nr_frags)
+                       break;
+               fraglen = skb_shinfo(skb)->frags[i].size;
+       }
+       if (next) {
+               skb = next;
+               BUG_ON(skb_shinfo(skb)->frag_list);
+               next = skb->next;
+               goto next_skb;
+       }
+       return -EFAULT;
+}
+
+/**
  *     skb_copy_datagram_iovec - Copy a datagram to an iovec.
  *     @skb: buffer to copy
  *     @offset: offset in the buffer to start copying from
@@ -467,6 +521,7 @@ unsigned int datagram_poll(struct file *
 
 EXPORT_SYMBOL(datagram_poll);
 EXPORT_SYMBOL(skb_copy_and_csum_datagram_iovec);
+EXPORT_SYMBOL(skb_copy_datagram);
 EXPORT_SYMBOL(skb_copy_datagram_iovec);
 EXPORT_SYMBOL(skb_free_datagram);
 EXPORT_SYMBOL(skb_recv_datagram);
diff --git a/net/core/sock.c b/net/core/sock.c
index cdc3f82..eed500b 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -455,6 +455,11 @@ set_rcvbuf:
                        spin_unlock_bh(&sk->sk_lock.slock);
                        ret = -ENONET;
                        break;
+               case SO_ASYNC_SOCK:
+                       spin_lock_bh(&sk->sk_lock.slock);
+                       sk->sk_async_sock = valbool;
+                       spin_unlock_bh(&sk->sk_lock.slock);
+                       break;
 
                /* We implement the SO_SNDLOWAT etc to
                   not be settable (1003.1g 5.3) */
diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index ef98b14..e9129c5 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -206,6 +206,7 @@
  *                                     lingertime == 0 (RFC 793 ABORT Call)
  *     Hirokazu Takahashi      :       Use copy_from_user() instead of
  *                                     csum_and_copy_from_user() if possible.
+ *     Evgeniy Polyakov        :       Network asynchronous IO.
  *
  *             This program is free software; you can redistribute it and/or
  *             modify it under the terms of the GNU General Public License
@@ -1090,6 +1091,160 @@ int tcp_read_sock(struct sock *sk, read_
 }
 
 /*
+ * Must be called with locked sock.
+ */
+int tcp_async_recv(struct sock *sk, void *dst, size_t len)
+{
+       struct tcp_sock *tp = tcp_sk(sk);
+       int copied = 0;
+       u32 *seq;
+       unsigned long used;
+       int err;
+       int target;             /* Read at least this many bytes */
+
+       TCP_CHECK_TIMER(sk);
+
+       err = -ENOTCONN;
+       if (sk->sk_state == TCP_LISTEN)
+               goto out;
+
+       seq = &tp->copied_seq;
+
+       target = sock_rcvlowat(sk, 0, len);
+
+       do {
+               struct sk_buff *skb;
+               u32 offset;
+
+               /* Are we at urgent data? Stop if we have read anything or have 
SIGURG pending. */
+               if (tp->urg_data && tp->urg_seq == *seq) {
+                       if (copied)
+                               break;
+               }
+
+               /* Next get a buffer. */
+
+               skb = skb_peek(&sk->sk_receive_queue);
+               do {
+                       if (!skb)
+                               break;
+
+                       /* Now that we have two receive queues this
+                        * shouldn't happen.
+                        */
+                       if (before(*seq, TCP_SKB_CB(skb)->seq)) {
+                               printk(KERN_INFO "async_recv bug: copied %X "
+                                      "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
+                               break;
+                       }
+                       offset = *seq - TCP_SKB_CB(skb)->seq;
+                       if (skb->h.th->syn)
+                               offset--;
+                       if (offset < skb->len)
+                               goto found_ok_skb;
+                       if (skb->h.th->fin)
+                               goto found_fin_ok;
+                       skb = skb->next;
+               } while (skb != (struct sk_buff *)&sk->sk_receive_queue);
+
+               if (copied)
+                       break;
+
+               if (sock_flag(sk, SOCK_DONE))
+                       break;
+
+               if (sk->sk_err) {
+                       copied = sock_error(sk);
+                       break;
+               }
+
+               if (sk->sk_shutdown & RCV_SHUTDOWN)
+                       break;
+
+               if (sk->sk_state == TCP_CLOSE) {
+                       if (!sock_flag(sk, SOCK_DONE)) {
+                               /* This occurs when user tries to read
+                                * from never connected socket.
+                                */
+                               copied = -ENOTCONN;
+                               break;
+                       }
+                       break;
+               }
+
+               copied = -EAGAIN;
+               break;
+
+       found_ok_skb:
+               /* Ok so how much can we use? */
+               used = skb->len - offset;
+               if (len < used)
+                       used = len;
+
+               /* Do we have urgent data here? */
+               if (tp->urg_data) {
+                       u32 urg_offset = tp->urg_seq - *seq;
+                       if (urg_offset < used) {
+                               if (!urg_offset) {
+                                       if (!sock_flag(sk, SOCK_URGINLINE)) {
+                                               ++*seq;
+                                               offset++;
+                                               used--;
+                                               if (!used)
+                                                       goto skip_copy;
+                                       }
+                               } else
+                                       used = urg_offset;
+                       }
+               }
+
+               err = skb_copy_datagram(skb, offset, dst, used);
+               if (err) {
+                       /* Exception. Bailout! */
+                       if (!copied)
+                               copied = -EFAULT;
+                       break;
+               }
+
+               *seq += used;
+               copied += used;
+               len -= used;
+               dst += used;
+
+               tcp_rcv_space_adjust(sk);
+
+skip_copy:
+               if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
+                       tp->urg_data = 0;
+                       tcp_fast_path_check(sk, tp);
+               }
+               if (used + offset < skb->len)
+                       continue;
+
+               if (skb->h.th->fin)
+                       goto found_fin_ok;
+               sk_eat_skb(sk, skb);
+               continue;
+
+       found_fin_ok:
+               /* Process the FIN. */
+               ++*seq;
+               sk_eat_skb(sk, skb);
+               break;
+       } while (len > 0);
+
+       /* Clean up data we have read: This will do ACK frames. */
+       cleanup_rbuf(sk, copied);
+
+       TCP_CHECK_TIMER(sk);
+       return copied;
+
+out:
+       TCP_CHECK_TIMER(sk);
+       return err;
+}
+
+/*
  *     This routine copies from a sock struct into the user buffer.
  *
  *     Technical note: in 2.3 we work on _locked_ socket, so that
@@ -2136,6 +2291,7 @@ EXPORT_SYMBOL(tcp_getsockopt);
 EXPORT_SYMBOL(tcp_ioctl);
 EXPORT_SYMBOL(tcp_poll);
 EXPORT_SYMBOL(tcp_read_sock);
+EXPORT_SYMBOL(tcp_async_recv);
 EXPORT_SYMBOL(tcp_recvmsg);
 EXPORT_SYMBOL(tcp_sendmsg);
 EXPORT_SYMBOL(tcp_sendpage);
diff --git a/net/ipv4/tcp_input.c b/net/ipv4/tcp_input.c
index 15d3340..8107f6e 100644
--- a/net/ipv4/tcp_input.c
+++ b/net/ipv4/tcp_input.c
@@ -3820,7 +3820,7 @@ int tcp_rcv_established(struct sock *sk,
                        if (tp->ucopy.task == current &&
                            tp->copied_seq == tp->rcv_nxt &&
                            len - tcp_header_len <= tp->ucopy.len &&
-                           sock_owned_by_user(sk)) {
+                           sock_owned_by_user(sk) && !sk->sk_async_sock) {
                                __set_current_state(TASK_RUNNING);
 
                                if (!tcp_copy_to_iovec(sk, skb, 
tcp_header_len)) {
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index c2a542b..17ad358 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -1248,11 +1248,17 @@ process:
 
        bh_lock_sock(sk);
        ret = 0;
-       if (!sock_owned_by_user(sk)) {
-               if (!tcp_prequeue(sk, skb))
-                       ret = tcp_v4_do_rcv(sk, skb);
-       } else
-               sk_add_backlog(sk, skb);
+       if (sk->sk_async_sock) {
+               local_bh_disable();
+               ret = tcp_v4_do_rcv(sk, skb);
+               local_bh_enable();
+       } else {
+               if (!sock_owned_by_user(sk)) {
+                       if (!tcp_prequeue(sk, skb))
+                               ret = tcp_v4_do_rcv(sk, skb);
+               } else
+                       sk_add_backlog(sk, skb);
+       }
        bh_unlock_sock(sk);
 
        sock_put(sk);
@@ -1977,6 +1983,7 @@ struct proto tcp_prot = {
        .getsockopt             = tcp_getsockopt,
        .sendmsg                = tcp_sendmsg,
        .recvmsg                = tcp_recvmsg,
+       .async_recv             = tcp_async_recv,
        .backlog_rcv            = tcp_v4_do_rcv,
        .hash                   = tcp_v4_hash,
        .unhash                 = tcp_unhash,

-- 
        Evgeniy Polyakov

Attachment: naio_vs_sync.png
Description: PNG image

Reply via email to