This patch starts looking at providing full overlapped I/O support to named pipes. We're not here yet.
It basically starts making the wineserver side of the named pipe code:
- use NT values instead of kernel32
- make use of asynchronous I/O
The rest of the patch is mainly cleanup. It should be a no op change.


A+
--
Eric Pouech
Name:          ntkrnl_53
ChangeLog:     
	- moved named pipe creation to ntdll
	- server now handles the named pipe flags as the NTDLL values (not the KERNEL32 ones)
	- named pipes in server now use the async IO mechanism for connect/wait ops
	- tiddy up async IO functions in server to match named pipes' needs
	- removed the no longer user APC_ASYNC kind of APC
License:       X11
GenDate:       2005/03/20 14:17:12 UTC
ModifiedFiles: dlls/kernel/sync.c dlls/ntdll/file.c dlls/ntdll/sync.c include/winternl.h 
server/fd.c server/file.c server/file.h server/named_pipe.c server/protocol.def server/serial.c server/sock.c
AddedFiles:    
RemovedFiles:  
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/dlls/kernel/sync.c,v
retrieving revision 1.71
diff -u -u -r1.71 sync.c
--- dlls/kernel/sync.c	4 Mar 2005 12:38:37 -0000	1.71
+++ dlls/kernel/sync.c	20 Mar 2005 08:28:28 -0000
@@ -56,6 +56,9 @@
 
 #include "wine/debug.h"
 
+#define NONAMELESSUNION
+#define NONAMELESSSTRUCT
+
 WINE_DEFAULT_DEBUG_CHANNEL(sync);
 
 /* check if current version is NT or Win95 */
@@ -1051,15 +1054,20 @@
 HANDLE WINAPI CreateNamedPipeW( LPCWSTR name, DWORD dwOpenMode,
                                 DWORD dwPipeMode, DWORD nMaxInstances,
                                 DWORD nOutBufferSize, DWORD nInBufferSize,
-                                DWORD nDefaultTimeOut, LPSECURITY_ATTRIBUTES attr )
+                                DWORD nDefaultTimeOut, LPSECURITY_ATTRIBUTES sa )
 {
-    HANDLE ret;
+    HANDLE handle;
     UNICODE_STRING nt_name;
-    static const WCHAR leadin[] = {'\\','?','?','\\','P','I','P','E','\\'};
+    OBJECT_ATTRIBUTES attr;
+    DWORD options;
+    BOOLEAN pipe_type, read_mode, non_block;
+    NTSTATUS status;
+    IO_STATUS_BLOCK iosb;
+    LARGE_INTEGER timeout;
 
     TRACE("(%s, %#08lx, %#08lx, %ld, %ld, %ld, %ld, %p)\n",
           debugstr_w(name), dwOpenMode, dwPipeMode, nMaxInstances,
-          nOutBufferSize, nInBufferSize, nDefaultTimeOut, attr );
+          nOutBufferSize, nInBufferSize, nDefaultTimeOut, sa );
 
     if (!RtlDosPathNameToNtPathName_U( name, &nt_name, NULL, NULL ))
     {
@@ -1072,30 +1080,42 @@
         RtlFreeUnicodeString( &nt_name );
         return INVALID_HANDLE_VALUE;
     }
-    if (nt_name.Length < sizeof(leadin) ||
-        strncmpiW( nt_name.Buffer, leadin, sizeof(leadin)/sizeof(leadin[0])))
-    {
-        SetLastError( ERROR_INVALID_NAME );
-        RtlFreeUnicodeString( &nt_name );
-        return INVALID_HANDLE_VALUE;
-    }
-    SERVER_START_REQ( create_named_pipe )
+
+    attr.Length                   = sizeof(attr);
+    attr.RootDirectory            = 0;
+    attr.ObjectName               = &nt_name;
+    attr.Attributes               = (sa && sa->bInheritHandle) ? OBJ_INHERIT : 0;
+    attr.SecurityDescriptor       = sa ? sa->lpSecurityDescriptor : NULL;
+    attr.SecurityQualityOfService = NULL;
+
+    options = 0;
+    if (dwOpenMode & FILE_FLAG_WRITE_THROUGH) options |= FILE_WRITE_THROUGH;
+    if (!(dwOpenMode & FILE_FLAG_OVERLAPPED)) options |= FILE_SYNCHRONOUS_IO_ALERT;
+    if ((dwOpenMode & PIPE_ACCESS_DUPLEX) == PIPE_ACCESS_DUPLEX)
+        options |= FILE_PIPE_FULL_DUPLEX;
+    else if (dwOpenMode & PIPE_ACCESS_INBOUND) options |= FILE_PIPE_INBOUND;
+    else if (dwOpenMode & PIPE_ACCESS_OUTBOUND) options |= FILE_PIPE_OUTBOUND;
+    pipe_type = (dwPipeMode & PIPE_TYPE_MESSAGE) ? TRUE : FALSE;
+    read_mode = (dwPipeMode & PIPE_READMODE_MESSAGE) ? TRUE : FALSE;
+    non_block = (dwPipeMode & PIPE_NOWAIT) ? TRUE : FALSE;
+    if (nMaxInstances >= PIPE_UNLIMITED_INSTANCES) nMaxInstances = ULONG_MAX;
+
+    timeout.QuadPart = (ULONGLONG)nDefaultTimeOut * -10000;
+
+    SetLastError(0);
+        
+    status = NtCreateNamedPipeFile(&handle, 0, &attr, &iosb, 0, FILE_OVERWRITE_IF,
+                                   options, pipe_type, read_mode, non_block, 
+                                   nMaxInstances, nInBufferSize, nOutBufferSize,
+                                   &timeout);
+
+    RtlFreeUnicodeString( &nt_name );
+    if (status)
     {
-        req->openmode = dwOpenMode;
-        req->pipemode = dwPipeMode;
-        req->maxinstances = nMaxInstances;
-        req->outsize = nOutBufferSize;
-        req->insize = nInBufferSize;
-        req->timeout = nDefaultTimeOut;
-        req->inherit = (attr && (attr->nLength>=sizeof(*attr)) && attr->bInheritHandle);
-        wine_server_add_data( req, nt_name.Buffer + 4, nt_name.Length - 4*sizeof(WCHAR) );
-        SetLastError(0);
-        if (!wine_server_call_err( req )) ret = reply->handle;
-        else ret = INVALID_HANDLE_VALUE;
+        handle = INVALID_HANDLE_VALUE;
+        SetLastError( RtlNtStatusToDosError(status) );
     }
-    SERVER_END_REQ;
-    RtlFreeUnicodeString( &nt_name );
-    return ret;
+    return handle;
 }
 
 
@@ -1106,7 +1126,7 @@
                            LPDWORD lpcbRead, LPDWORD lpcbAvail, LPDWORD lpcbMessage )
 {
 #ifdef FIONREAD
-    int avail=0, fd, ret, flags;
+    int avail = 0, fd, ret, flags;
 
     ret = wine_server_handle_to_fd( hPipe, GENERIC_READ, &fd, &flags );
     if (ret)
@@ -1175,17 +1195,21 @@
 }
 
 /***********************************************************************
- *           SYNC_CompletePipeOverlapped   (Internal)
+ *           PIPE_CompletionWait   (Internal)
  */
-static void CALLBACK SYNC_CompletePipeOverlapped (LPOVERLAPPED overlapped, DWORD result)
+static void WINAPI PIPE_CompletionWait(void *user, PIO_STATUS_BLOCK iosb, ULONG status)
 {
-    TRACE("for %p result %08lx\n",overlapped,result);
-    if(!overlapped)
-        return;
-    overlapped->Internal = result;
-    SetEvent(overlapped->hEvent);
-}
+    LPOVERLAPPED        ovlp = (LPOVERLAPPED)user;
 
+    TRACE("for %p/%p, status=%08lx\n", ovlp, iosb, status);
+
+    if (ovlp)
+    {
+        ovlp->Internal = status;
+        SetEvent(ovlp->hEvent);
+    }
+    TRACE("done\n");
+}
 
 /***********************************************************************
  *           WaitNamedPipeA   (KERNEL32.@)
@@ -1215,7 +1239,7 @@
     UNICODE_STRING nt_name;
     static const WCHAR leadin[] = {'\\','?','?','\\','P','I','P','E','\\'};
 
-    TRACE("%s 0x%08lx\n",debugstr_w(name),nTimeOut);
+    TRACE("%s 0x%08lx\n", debugstr_w(name), nTimeOut);
 
     if (!RtlDosPathNameToNtPathName_U( name, &nt_name, NULL, NULL ))
         return FALSE;
@@ -1232,7 +1256,7 @@
         return FALSE;
     }
 
-    memset(&ov,0,sizeof(ov));
+    memset(&ov, 0, sizeof(ov));
     ov.hEvent = CreateEventW( NULL, 0, 0, NULL );
     if (!ov.hEvent)
         return FALSE;
@@ -1241,7 +1265,7 @@
     {
         req->timeout = nTimeOut;
         req->overlapped = &ov;
-        req->func = SYNC_CompletePipeOverlapped;
+        req->func = PIPE_CompletionWait;
         wine_server_add_data( req, nt_name.Buffer + 4, nt_name.Length - 4*sizeof(WCHAR) );
         ret = !wine_server_call_err( req );
     }
@@ -1249,9 +1273,9 @@
 
     RtlFreeUnicodeString( &nt_name );
 
-    if(ret)
+    if (ret)
     {
-        if (WAIT_OBJECT_0==WaitForSingleObject(ov.hEvent,INFINITE))
+        if (WAIT_OBJECT_0 == WaitForSingleObject(ov.hEvent, INFINITE))
         {
             SetLastError(RtlNtStatusToDosError(ov.Internal));
             ret = (ov.Internal==STATUS_SUCCESS);
@@ -1263,63 +1287,50 @@
 
 
 /***********************************************************************
- *           SYNC_ConnectNamedPipe   (Internal)
+ *           ConnectNamedPipe   (KERNEL32.@)
  */
-static BOOL SYNC_ConnectNamedPipe(HANDLE hPipe, LPOVERLAPPED overlapped)
+BOOL WINAPI ConnectNamedPipe(HANDLE hPipe, LPOVERLAPPED overlapped)
 {
-    BOOL ret;
-
-    if(!overlapped)
-        return FALSE;
-
-    overlapped->Internal = STATUS_PENDING;
+    BOOL                ret;
+    LPOVERLAPPED        pov;
+    OVERLAPPED          ov;
+
+    TRACE("(%p,%p)\n", hPipe, overlapped);
+
+    if (!overlapped)
+    {
+        memset(&ov, 0, sizeof(ov));
+        ov.hEvent = CreateEventW(NULL, 0, 0, NULL);
+        if (!ov.hEvent) return FALSE;
+        pov = &ov;
+    }
+    else pov = overlapped;
+        
+    pov->Internal = STATUS_PENDING;
 
     SERVER_START_REQ( connect_named_pipe )
     {
         req->handle = hPipe;
-        req->overlapped = overlapped;
-        req->func = SYNC_CompletePipeOverlapped;
+        req->overlapped = pov;
+        req->func = PIPE_CompletionWait;
         ret = !wine_server_call_err( req );
     }
     SERVER_END_REQ;
 
-    return ret;
-}
-
-/***********************************************************************
- *           ConnectNamedPipe   (KERNEL32.@)
- */
-BOOL WINAPI ConnectNamedPipe(HANDLE hPipe, LPOVERLAPPED overlapped)
-{
-    OVERLAPPED ov;
-    BOOL ret;
-
-    TRACE("(%p,%p)\n",hPipe, overlapped);
-
-    if(overlapped)
+    if (ret)
     {
-        if(SYNC_ConnectNamedPipe(hPipe,overlapped))
+        if (overlapped)
+        {
             SetLastError( ERROR_IO_PENDING );
-        return FALSE;
-    }
-
-    memset(&ov,0,sizeof(ov));
-    ov.hEvent = CreateEventW(NULL,0,0,NULL);
-    if (!ov.hEvent)
-        return FALSE;
-
-    ret=SYNC_ConnectNamedPipe(hPipe, &ov);
-    if(ret)
-    {
-        if (WAIT_OBJECT_0==WaitForSingleObject(ov.hEvent,INFINITE))
+            ret = FALSE;
+        }
+        else
         {
-            SetLastError(RtlNtStatusToDosError(ov.Internal));
-            ret = (ov.Internal==STATUS_SUCCESS);
+            ret = GetOverlappedResult(hPipe, &ov, NULL, TRUE);
+            CloseHandle(ov.hEvent);
         }
     }
 
-    CloseHandle(ov.hEvent);
-
     return ret;
 }
 
@@ -1330,7 +1341,7 @@
 {
     BOOL ret;
 
-    TRACE("(%p)\n",hPipe);
+    TRACE("(%p)\n", hPipe);
 
     SERVER_START_REQ( disconnect_named_pipe )
     {
@@ -1389,10 +1400,19 @@
     {
         req->handle = hNamedPipe;
         ret = !wine_server_call_err( req );
-        if(lpFlags) *lpFlags = reply->flags;
-        if(lpOutputBufferSize) *lpOutputBufferSize = reply->outsize;
-        if(lpInputBufferSize) *lpInputBufferSize = reply->outsize;
-        if(lpMaxInstances) *lpMaxInstances = reply->maxinstances;
+        if (lpFlags)
+        {
+            *lpFlags = 0;
+            if (reply->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE)
+                *lpFlags |= PIPE_TYPE_MESSAGE;
+            if (reply->flags & NAMED_PIPE_MESSAGE_STREAM_READ)
+                *lpFlags |= PIPE_READMODE_MESSAGE;
+            if (reply->flags & NAMED_PIPE_NONBLOCKING_MODE)
+                *lpFlags |= PIPE_NOWAIT;
+        }
+        if (lpOutputBufferSize) *lpOutputBufferSize = reply->outsize;
+        if (lpInputBufferSize) *lpInputBufferSize = reply->outsize;
+        if (lpMaxInstances) *lpMaxInstances = reply->maxinstances;
     }
     SERVER_END_REQ;
 
@@ -1459,7 +1479,7 @@
            debugstr_a(lpNamedPipeName), lpInput, dwInputSize,
            lpOutput, dwOutputSize, lpBytesRead, nTimeout);
 
-    if( lpNamedPipeName )
+    if ( lpNamedPipeName )
     {
         len = MultiByteToWideChar( CP_ACP, 0, lpNamedPipeName, -1, NULL, 0 );
         str = HeapAlloc( GetProcessHeap(), 0, len*sizeof(WCHAR) );
@@ -1467,7 +1487,7 @@
     }
     ret = CallNamedPipeW( str, lpInput, dwInputSize, lpOutput,
                           dwOutputSize, lpBytesRead, nTimeout );
-    if( lpNamedPipeName )
+    if ( lpNamedPipeName )
         HeapFree( GetProcessHeap(), 0, str );
 
     return ret;
@@ -1541,7 +1561,7 @@
     TRACE("%s %ld %ld %p\n", debugstr_a(lpName),
           nMaxMessageSize, lReadTimeout, sa);
 
-    if( lpName )
+    if ( lpName )
     {
         len = MultiByteToWideChar( CP_ACP, 0, lpName, -1, NULL, 0 );
         name = HeapAlloc( GetProcessHeap(), 0, len*sizeof(WCHAR) );
@@ -1601,7 +1621,7 @@
                                LPDWORD lpNextSize, LPDWORD lpMessageCount,
                                LPDWORD lpReadTimeout )
 {
-    FIXME("(%p): stub\n",hMailslot);
+    FIXME("(%p): stub\n", hMailslot);
     SetLastError(ERROR_CALL_NOT_IMPLEMENTED);
     return FALSE;
 }
@@ -1674,11 +1694,11 @@
 
     TRACE("%p %s\n", attr, debugstr_a(name) );
 
-    if( name )
+    if ( name )
     {
         len = MultiByteToWideChar( CP_ACP, 0, name, -1, NULL, 0 );
         str = HeapAlloc( GetProcessHeap(), 0, len*sizeof(WCHAR) );
-        if( !str )
+        if ( !str )
         {
             SetLastError( ERROR_OUTOFMEMORY );
             return 0;
Index: dlls/ntdll/file.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/dlls/ntdll/file.c,v
retrieving revision 1.77
diff -u -u -r1.77 file.c
--- dlls/ntdll/file.c	16 Mar 2005 19:48:42 -0000	1.77
+++ dlls/ntdll/file.c	19 Mar 2005 13:49:07 -0000
@@ -1677,19 +1677,49 @@
  *
  *
  */
-NTSTATUS WINAPI NtCreateNamedPipeFile( PHANDLE FileHandle, ULONG DesiredAccess,
-     POBJECT_ATTRIBUTES ObjectAttributes, PIO_STATUS_BLOCK IoStatusBlock,
-     ULONG ShareAccess, ULONG CreateDisposition, ULONG CreateOptions,
-     ULONG NamedPipeType, ULONG ReadMode, ULONG CompletionMode,
-     ULONG MaximumInstances, ULONG InboundQuota, ULONG OutboundQuota,
-     PLARGE_INTEGER DefaultTimeout)
+NTSTATUS WINAPI NtCreateNamedPipeFile( PHANDLE handle, ULONG access,
+                                       POBJECT_ATTRIBUTES oa, PIO_STATUS_BLOCK iosb,
+                                       ULONG sharing, ULONG dispo, ULONG options,
+                                       ULONG pipe_type, ULONG read_mode, 
+                                       ULONG completion_mode, ULONG max_inst,
+                                       ULONG inbound_quota, ULONG outbound_quota,
+                                       PLARGE_INTEGER timeout)
 {
-    FIXME("(%p %lx %p %p %lx %ld %lx %ld %ld %ld %ld %ld %ld %p): stub\n",
-          FileHandle, DesiredAccess, ObjectAttributes, IoStatusBlock,
-          ShareAccess, CreateDisposition, CreateOptions, NamedPipeType,
-          ReadMode, CompletionMode, MaximumInstances, InboundQuota,
-          OutboundQuota, DefaultTimeout);
-    return STATUS_NOT_IMPLEMENTED;
+    NTSTATUS    status;
+    static const WCHAR leadin[] = {'\\','?','?','\\','P','I','P','E','\\'};
+
+    TRACE("(%p %lx %p %p %lx %ld %lx %ld %ld %ld %ld %ld %ld %p): stub\n",
+          handle, access, oa, iosb, sharing, dispo, options, pipe_type,
+          read_mode, completion_mode, max_inst, inbound_quota, outbound_quota,
+          timeout);
+
+    if (oa->ObjectName->Length < sizeof(leadin) ||
+        strncmpiW( oa->ObjectName->Buffer, 
+                   leadin, sizeof(leadin)/sizeof(leadin[0]) ))
+        return STATUS_OBJECT_NAME_INVALID;
+    /* assume we only get relative timeout, and storable in a DWORD as ms */
+    if (timeout->QuadPart > 0 || (timeout->QuadPart / -10000) >> 32)
+        FIXME("Wrong time %s\n", wine_dbgstr_longlong(timeout->QuadPart));
+
+    SERVER_START_REQ( create_named_pipe )
+    {
+        req->options = options; /* FIXME not used in server yet !!!! */
+        req->flags = 
+            (pipe_type) ? NAMED_PIPE_MESSAGE_STREAM_WRITE : 0 |
+            (read_mode) ? NAMED_PIPE_MESSAGE_STREAM_READ  : 0 |
+            (completion_mode) ? NAMED_PIPE_NONBLOCKING_MODE  : 0;
+        req->maxinstances = max_inst;
+        req->outsize = outbound_quota;
+        req->insize  = inbound_quota;
+        req->timeout = timeout->QuadPart / -10000;
+        req->inherit = (oa->Attributes & OBJ_INHERIT) != 0;
+        wine_server_add_data( req, oa->ObjectName->Buffer + 4, 
+                              oa->ObjectName->Length - 4 * sizeof(WCHAR) );
+        status = wine_server_call( req );
+        if (!status) *handle = reply->handle;
+    }
+    SERVER_END_REQ;
+    return status;
 }
 
 /******************************************************************
Index: dlls/ntdll/sync.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/dlls/ntdll/sync.c,v
retrieving revision 1.43
diff -u -u -r1.43 sync.c
--- dlls/ntdll/sync.c	4 Mar 2005 12:38:37 -0000	1.43
+++ dlls/ntdll/sync.c	20 Mar 2005 08:18:47 -0000
@@ -75,8 +75,8 @@
                                    IN LONG InitialCount,
                                    IN LONG MaximumCount )
 {
-    DWORD len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
-    NTSTATUS ret;
+    DWORD       len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
+    NTSTATUS    ret;
 
     if (MaximumCount <= 0 || InitialCount < 0 || InitialCount > MaximumCount)
         return STATUS_INVALID_PARAMETER;
@@ -103,8 +103,8 @@
                                  IN ACCESS_MASK access,
                                  IN const OBJECT_ATTRIBUTES *attr )
 {
-    DWORD len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
-    NTSTATUS ret;
+    DWORD       len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
+    NTSTATUS    ret;
 
     if (len >= MAX_PATH * sizeof(WCHAR)) return STATUS_NAME_TOO_LONG;
 
@@ -169,8 +169,8 @@
 	IN BOOLEAN ManualReset,
 	IN BOOLEAN InitialState)
 {
-    DWORD len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
-    NTSTATUS ret;
+    DWORD       len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
+    NTSTATUS    ret;
 
     if (len >= MAX_PATH * sizeof(WCHAR)) return STATUS_NAME_TOO_LONG;
 
@@ -197,8 +197,8 @@
 	IN ACCESS_MASK DesiredAccess,
 	IN const OBJECT_ATTRIBUTES *attr )
 {
-    DWORD len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
-    NTSTATUS ret;
+    DWORD       len = attr && attr->ObjectName ? attr->ObjectName->Length : 0;
+    NTSTATUS    ret;
 
     if (len >= MAX_PATH * sizeof(WCHAR)) return STATUS_NAME_TOO_LONG;
 
@@ -674,9 +674,6 @@
         {
         case APC_NONE:
             return;  /* no more APCs */
-        case APC_ASYNC:
-            proc( arg1, arg2 );
-            break;
         case APC_USER:
             proc( arg1, arg2, arg3 );
             break;
Index: include/winternl.h
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/include/winternl.h,v
retrieving revision 1.109
diff -u -u -r1.109 winternl.h
--- include/winternl.h	15 Feb 2005 20:47:24 -0000	1.109
+++ include/winternl.h	26 Feb 2005 18:25:19 -0000
@@ -1283,6 +1300,11 @@
 #define FILE_AUTOGENERATED_DEVICE_NAME  0x00000080
 #define FILE_DEVICE_SECURE_OPEN         0x00000100
 
+/* options for NtCreateNamedPipeFile */
+#define FILE_PIPE_INBOUND               0x00000000
+#define FILE_PIPE_OUTBOUND              0x00000001
+#define FILE_PIPE_FULL_DUPLEX           0x00000002
+
 #if (_WIN32_WINNT >= 0x0501)
 #define INTERNAL_TS_ACTIVE_CONSOLE_ID ( *((volatile ULONG*)(0x7ffe02d8)) )
 #endif /* (_WIN32_WINNT >= 0x0501) */
Index: server/fd.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/fd.c,v
retrieving revision 1.35
diff -u -u -r1.35 fd.c
--- server/fd.c	4 Mar 2005 12:38:37 -0000	1.35
+++ server/fd.c	19 Mar 2005 17:42:06 -0000
@@ -251,7 +251,8 @@
 static struct list timeout_list = LIST_INIT(timeout_list);   /* sorted timeouts list */
 
 /* add a timeout user */
-struct timeout_user *add_timeout_user( struct timeval *when, timeout_callback func, void *private )
+struct timeout_user *add_timeout_user( const struct timeval *when, timeout_callback func,
+                                       void *private )
 {
     struct timeout_user *user;
     struct list *ptr;
@@ -987,12 +988,10 @@
 
 struct async
 {
-    struct fd           *fd;
     struct thread       *thread;
     void                *apc;
     void                *user;
     void                *sb;
-    struct timeval       when;
     struct timeout_user *timeout;
     struct list          entry;
 };
@@ -1022,14 +1021,13 @@
 }
 
 /* create an async on a given queue of a fd */
-struct async *create_async(struct fd *fd, struct thread *thread, int timeout, struct list *queue,
+struct async *create_async(struct thread *thread, int* timeout, struct list *queue,
                            void *io_apc, void *io_user, void* io_sb)
 {
     struct async *async = mem_alloc( sizeof(struct async) );
 
     if (!async) return NULL;
 
-    async->fd = fd;
     async->thread = (struct thread *)grab_object(thread);
     async->apc = io_apc;
     async->user = io_user;
@@ -1039,9 +1037,11 @@
 
     if (timeout)
     {
-        gettimeofday( &async->when, 0 );
-        add_timeout( &async->when, timeout );
-        async->timeout = add_timeout_user( &async->when, async_callback, async );
+        struct timeval       when;
+
+        gettimeofday( &when, 0 );
+        add_timeout( &when, *timeout );
+        async->timeout = add_timeout_user( &when, async_callback, async );
     }
     else async->timeout = NULL;
 
Index: server/file.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/file.c,v
retrieving revision 1.91
diff -u -u -r1.91 file.c
--- server/file.c	4 Mar 2005 12:38:37 -0000	1.91
+++ server/file.c	19 Mar 2005 17:27:28 -0000
@@ -296,7 +296,7 @@
         return;
     }
 
-    if (!create_async( fd, current, 0, queue, apc, user, iosb ))
+    if (!create_async( current, 0, queue, apc, user, iosb ))
         return;
 
     /* Check if the new pending request can be served immediately */
Index: server/file.h
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/file.h,v
retrieving revision 1.19
diff -u -u -r1.19 file.h
--- server/file.h	24 Feb 2005 17:06:31 -0000	1.19
+++ server/file.h	19 Mar 2005 17:42:22 -0000
@@ -79,12 +79,12 @@
 
 typedef void (*timeout_callback)( void *private );
 
-extern struct timeout_user *add_timeout_user( struct timeval *when,
+extern struct timeout_user *add_timeout_user( const struct timeval *when,
                                               timeout_callback func, void *private );
 extern void remove_timeout_user( struct timeout_user *user );
 extern void add_timeout( struct timeval *when, int timeout );
 /* return 1 if t1 is before t2 */
-static inline int time_before( struct timeval *t1, struct timeval *t2 )
+static inline int time_before( const struct timeval *t1, const struct timeval *t2 )
 {
     return ((t1->tv_sec < t2->tv_sec) ||
             ((t1->tv_sec == t2->tv_sec) && (t1->tv_usec < t2->tv_usec)));
@@ -111,7 +111,7 @@
 extern struct object *create_serial( struct fd *fd, unsigned int options );
 
 /* async I/O functions */
-extern struct async *create_async( struct fd *fd, struct thread *thread, int timeout,
+extern struct async *create_async( struct thread *thread, int* timeout,
                                    struct list *queue, void *, void *, void *);
 extern void async_terminate_head( struct list *queue, int status );
 
Index: server/named_pipe.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/named_pipe.c,v
retrieving revision 1.35
diff -u -u -r1.35 named_pipe.c
--- server/named_pipe.c	4 Mar 2005 12:38:37 -0000	1.35
+++ server/named_pipe.c	19 Mar 2005 20:26:39 -0000
@@ -60,13 +60,6 @@
     ps_wait_connect
 };
 
-struct wait_info
-{
-    struct thread       *thread;
-    void                *func;
-    void                *overlapped;
-};
-
 struct named_pipe;
 
 struct pipe_server
@@ -79,7 +72,7 @@
     struct named_pipe   *pipe;
     struct timeout_user *flush_poll;
     struct event        *event;
-    struct wait_info     wait;
+    struct list          wait_q;     /* only a single one can be queued */
 };
 
 struct pipe_client
@@ -87,14 +80,6 @@
     struct object        obj;        /* object header */
     struct fd           *fd;         /* pipe file descriptor */
     struct pipe_server  *server;     /* server that this client is connected to */
-    struct wait_info     wait;
-};
-
-struct connect_wait
-{
-    struct list          entry;      /* entry in named pipe wait list */
-    struct wait_info     wait;
-    struct timeout_user *timeout_user;
 };
 
 struct named_pipe
@@ -107,7 +92,7 @@
     unsigned int        timeout;
     unsigned int        instances;
     struct list         servers;     /* list of servers using this pipe */
-    struct list         waiters;     /* list of clients waiting to connect */
+    struct list         waiters_q;   /* list of clients waiting to connect */
 };
 
 static void named_pipe_dump( struct object *obj, int verbose );
@@ -208,89 +193,19 @@
     fprintf( stderr, "Named pipe client server=%p\n", client->server );
 }
 
-static void notify_waiter( struct wait_info *wait, unsigned int status )
-{
-    if( wait->thread && wait->func && wait->overlapped )
-    {
-        /* queue a system APC, to notify a waiting thread */
-        thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC,
-                          1, wait->overlapped, (void *)status, NULL );
-    }
-    if( wait->thread ) release_object( wait->thread );
-    wait->thread = NULL;
-}
-
-static void set_waiter( struct wait_info *wait, void *func, void *ov )
-{
-    wait->thread = (struct thread *) grab_object( current );
-    wait->func = func;
-    wait->overlapped = ov;
-}
-
-static void notify_connect_waiter( struct connect_wait *waiter, unsigned int status )
-{
-    notify_waiter( &waiter->wait, status );
-    list_remove( &waiter->entry );
-    free( waiter );
-}
-
-static void notify_all_connect_waiters( struct named_pipe *pipe, unsigned int status )
-{
-    struct list *ptr;
-
-    while ((ptr = list_head( &pipe->waiters )) != NULL)
-    {
-        struct connect_wait *waiter = LIST_ENTRY( ptr, struct connect_wait, entry );
-        if (waiter->timeout_user) remove_timeout_user( waiter->timeout_user );
-        notify_connect_waiter( waiter, status );
-    }
-}
-
-/* pipe connect wait timeout */
-static void connect_timeout( void *ptr )
-{
-    struct connect_wait *waiter = (struct connect_wait *)ptr;
-    notify_connect_waiter( waiter, STATUS_TIMEOUT );
-}
-
 static void named_pipe_destroy( struct object *obj)
 {
     struct named_pipe *pipe = (struct named_pipe *) obj;
 
     assert( list_empty( &pipe->servers ) );
     assert( !pipe->instances );
-    notify_all_connect_waiters( pipe, STATUS_HANDLES_CLOSED );
-}
-
-static void queue_connect_waiter( struct named_pipe *pipe, void *func,
-                                  void *overlapped, unsigned int *timeout )
-{
-    struct connect_wait *waiter;
-
-    waiter = mem_alloc( sizeof(*waiter) );
-    if( waiter )
-    {
-        struct timeval tv;
-
-        set_waiter( &waiter->wait, func, overlapped );
-        list_add_tail( &pipe->waiters, &waiter->entry );
-
-        if (timeout)
-        {
-            gettimeofday( &tv, 0 );
-            add_timeout( &tv, *timeout );
-            waiter->timeout_user = add_timeout_user( &tv, connect_timeout,
-                                                     waiter );
-        }
-        else
-            waiter->timeout_user = NULL;
-    }
+    async_terminate_queue( &pipe->waiters_q, STATUS_HANDLES_CLOSED );
 }
 
 static struct fd *pipe_client_get_fd( struct object *obj )
 {
     struct pipe_client *client = (struct pipe_client *) obj;
-    if( client->fd )
+    if ( client->fd )
         return (struct fd *) grab_object( client->fd );
     set_error( STATUS_PIPE_DISCONNECTED );
     return NULL;
@@ -300,7 +215,7 @@
 {
     struct pipe_server *server = (struct pipe_server *) obj;
 
-    switch(server->state)
+    switch (server->state)
     {
     case ps_connected_server:
     case ps_wait_disconnect:
@@ -323,7 +238,7 @@
 
 static void notify_empty( struct pipe_server *server )
 {
-    if( !server->flush_poll )
+    if ( !server->flush_poll )
         return;
     assert( server->state == ps_connected_server );
     assert( server->event );
@@ -337,7 +252,7 @@
 static void do_disconnect( struct pipe_server *server )
 {
     /* we may only have a server fd, if the client disconnected */
-    if( server->client )
+    if ( server->client )
     {
         assert( server->client->server == server );
         assert( server->client->fd );
@@ -355,19 +270,19 @@
 
     assert( obj->ops == &pipe_server_ops );
 
-    if( server->fd )
+    if ( server->fd )
     {
         notify_empty( server );
         do_disconnect( server );
     }
 
-    if( server->client )
+    if ( server->client )
     {
         server->client->server = NULL;
         server->client = NULL;
     }
 
-    notify_waiter( &server->wait, STATUS_HANDLES_CLOSED );
+    async_terminate_head( &server->wait_q, STATUS_HANDLES_CLOSED );
 
     assert( server->pipe->instances );
     server->pipe->instances--;
@@ -383,13 +298,11 @@
 
     assert( obj->ops == &pipe_client_ops );
 
-    notify_waiter( &client->wait, STATUS_HANDLES_CLOSED );
-
-    if( server )
+    if ( server )
     {
         notify_empty( server );
 
-        switch( server->state )
+        switch ( server->state )
         {
         case ps_connected_server:
             /* Don't destroy the server's fd here as we can't
@@ -427,13 +340,13 @@
     assert( server->client );
 
     fd = get_unix_fd( server->client->fd );
-    if( fd < 0 )
+    if ( fd < 0 )
         return 0;
     pfd.fd = fd;
     pfd.events = POLLIN;
     pfd.revents = 0;
 
-    if( 0 > poll( &pfd, 1, 0 ) )
+    if ( 0 > poll( &pfd, 1, 0 ) )
         return 0;
  
     return pfd.revents&POLLIN;
@@ -444,7 +357,7 @@
     struct pipe_server *server = (struct pipe_server*) arg;
 
     assert( server->event );
-    if( pipe_data_remaining( server ) )
+    if ( pipe_data_remaining( server ) )
     {
         struct timeval tv;
 
@@ -466,25 +379,25 @@
 {
     struct pipe_server *server = get_fd_user( fd );
 
-    if( !server )
+    if ( !server )
         return 0;
 
-    if( server->state != ps_connected_server )
+    if ( server->state != ps_connected_server )
         return 0;
 
     /* FIXME: if multiple threads flush the same pipe,
               maybe should create a list of processes to notify */
-    if( server->flush_poll )
+    if ( server->flush_poll )
         return 0;
 
-    if( pipe_data_remaining( server ) )
+    if ( pipe_data_remaining( server ) )
     {
         struct timeval tv;
 
         /* this kind of sux - 
            there's no unix way to be alerted when a pipe becomes empty */
         server->event = create_event( NULL, 0, 0, 0 );
-        if( !server->event )
+        if ( !server->event )
             return 0;
         gettimeofday( &tv, 0 );
         add_timeout( &tv, 100 );
@@ -511,14 +424,14 @@
     struct named_pipe *pipe;
 
     pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len );
-    if( pipe )
+    if ( pipe )
     {
-        if( get_error() != STATUS_OBJECT_NAME_COLLISION )
+        if ( get_error() != STATUS_OBJECT_NAME_COLLISION )
         {
             /* initialize it if it didn't already exist */
             pipe->instances = 0;
             list_init( &pipe->servers );
-            list_init( &pipe->waiters );
+            list_init( &pipe->waiters_q );
         }
     }
     return pipe;
@@ -552,7 +465,7 @@
     struct pipe_server *server;
 
     server = alloc_object( &pipe_server_ops );
-    if( !server )
+    if ( !server )
         return NULL;
 
     server->fd = NULL;
@@ -560,7 +473,7 @@
     server->state = ps_idle_server;
     server->client = NULL;
     server->flush_poll = NULL;
-    server->wait.thread = NULL;
+    list_init( &server->wait_q );
 
     list_add_head( &pipe->servers, &server->entry );
     grab_object( pipe );
@@ -573,12 +486,11 @@
     struct pipe_client *client;
 
     client = alloc_object( &pipe_client_ops );
-    if( !client )
+    if ( !client )
         return NULL;
 
     client->fd = NULL;
     client->server = server;
-    client->wait.thread = NULL;
 
     return client;
 }
@@ -589,7 +501,8 @@
 
     LIST_FOR_EACH_ENTRY( server, &pipe->servers, struct pipe_server, entry )
     {
-        if (server->state == state) return (struct pipe_server *)grab_object( server );
+        if (server->state == state)
+            return (struct pipe_server *)grab_object( server );
     }
     return NULL;
 }
@@ -614,29 +527,29 @@
 
     reply->handle = 0;
     pipe = create_named_pipe( get_req_data(), get_req_data_size() );
-    if( !pipe )
+    if ( !pipe )
         return;
 
-    if( get_error() != STATUS_OBJECT_NAME_COLLISION )
+    if ( get_error() != STATUS_OBJECT_NAME_COLLISION )
     {
         pipe->insize = req->insize;
         pipe->outsize = req->outsize;
         pipe->maxinstances = req->maxinstances;
         pipe->timeout = req->timeout;
-        pipe->pipemode = req->pipemode;
+        pipe->pipemode = req->flags;
     }
     else
     {
         set_error( 0 );  /* clear the name collision */
-        if( pipe->maxinstances <= pipe->instances )
+        if ( pipe->maxinstances <= pipe->instances )
         {
             set_error( STATUS_PIPE_BUSY );
             release_object( pipe );
             return;
         }
-        if( ( pipe->maxinstances != req->maxinstances ) ||
+        if ( ( pipe->maxinstances != req->maxinstances ) ||
             ( pipe->timeout != req->timeout ) ||
-            ( pipe->pipemode != req->pipemode ) )
+            ( pipe->pipemode != req->flags ) )
         {
             set_error( STATUS_ACCESS_DENIED );
             release_object( pipe );
@@ -645,7 +558,7 @@
     }
 
     server = create_pipe_server( pipe );
-    if(server)
+    if (server)
     {
         reply->handle = alloc_handle( current->process, server,
                                       GENERIC_READ|GENERIC_WRITE, req->inherit );
@@ -680,9 +593,9 @@
     }
 
     client = create_pipe_client( server );
-    if( client )
+    if ( client )
     {
-        if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) )
+        if ( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) )
         {
             assert( !client->fd );
             assert( !server->fd );
@@ -692,9 +605,9 @@
                                             fds[0], &server->obj );
             if (client->fd && server->fd)
             {
-                if( server->state == ps_wait_open )
-                    notify_waiter( &server->wait, STATUS_SUCCESS );
-                assert( !server->wait.thread );
+                if ( server->state == ps_wait_open )
+                    async_terminate_head( &server->wait_q, STATUS_SUCCESS );
+                assert( list_empty( &server->wait_q ) );
                 server->state = ps_connected_server;
                 server->client = client;
                 client->server = server;
@@ -715,17 +628,18 @@
     struct pipe_server *server;
 
     server = get_pipe_server_obj(current->process, req->handle, 0);
-    if(!server)
+    if (!server)
         return;
 
-    switch( server->state )
+    switch ( server->state )
     {
     case ps_idle_server:
     case ps_wait_connect:
         assert( !server->fd );
         server->state = ps_wait_open;
-        set_waiter( &server->wait, req->func, req->overlapped );
-        notify_all_connect_waiters( server->pipe, STATUS_SUCCESS );
+        create_async( current, NULL, &server->wait_q,
+                      req->func, req->overlapped, NULL );
+        async_terminate_queue( &server->pipe->waiters_q, STATUS_SUCCESS );
         break;
     case ps_connected_server:
         assert( server->fd );
@@ -756,26 +670,28 @@
         return;
     }
     server = find_server( pipe, ps_wait_open );
-    if( server )
+    if ( server )
     {
         /* there's already a server waiting for a client to connect */
-        struct wait_info wait;
-        set_waiter( &wait, req->func, req->overlapped );
-        notify_waiter( &wait, STATUS_SUCCESS );
+        thread_queue_apc( current, NULL, req->func, APC_ASYNC_IO,
+                          1, req->overlapped, NULL, (void *)STATUS_SUCCESS );
         release_object( server );
     }
     else
     {
         unsigned int timeout;
+        
         if (req->timeout == NMPWAIT_USE_DEFAULT_WAIT)
             timeout = pipe->timeout;
         else
             timeout = req->timeout;
 
         if (req->timeout == NMPWAIT_WAIT_FOREVER)
-            queue_connect_waiter( pipe, req->func, req->overlapped, NULL );
+            create_async( current, NULL, &pipe->waiters_q,
+                          req->func, req->overlapped, NULL );
         else
-            queue_connect_waiter( pipe, req->func, req->overlapped, &timeout );
+            create_async( current, &timeout, &pipe->waiters_q,
+                          req->func, req->overlapped, NULL );
     }
 
     release_object( pipe );
@@ -787,9 +703,9 @@
 
     reply->fd = -1;
     server = get_pipe_server_obj( current->process, req->handle, 0 );
-    if( !server )
+    if ( !server )
         return;
-    switch( server->state )
+    switch ( server->state )
     {
     case ps_connected_server:
         assert( server->fd );
@@ -797,7 +713,6 @@
         assert( server->client->fd );
 
         notify_empty( server );
-        notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED );
 
         /* Dump the client and server fds, but keep the pointers
            around - client loses all waiting data */
@@ -829,7 +744,7 @@
     struct pipe_server *server;
 
     server = get_pipe_server_obj( current->process, req->handle, 0 );
-    if(!server)
+    if (!server)
         return;
 
     reply->flags        = server->pipe->pipemode;
Index: server/protocol.def
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/protocol.def,v
retrieving revision 1.122
diff -u -u -r1.122 protocol.def
--- server/protocol.def	17 Mar 2005 19:10:41 -0000	1.122
+++ server/protocol.def	20 Mar 2005 08:11:57 -0000
@@ -416,7 +416,7 @@
     void*        arg2;
     void*        arg3;
 @END
-enum apc_type { APC_NONE, APC_USER, APC_TIMER, APC_ASYNC, APC_ASYNC_IO };
+enum apc_type { APC_NONE, APC_USER, APC_TIMER, APC_ASYNC_IO };
 
 
 /* Close a handle for the current process */
@@ -1641,8 +1659,8 @@
 
 /* Create a named pipe */
 @REQ(create_named_pipe)
-    unsigned int   openmode;
-    unsigned int   pipemode;
+    unsigned int   options;
+    unsigned int   flags;
     unsigned int   maxinstances;
     unsigned int   outsize;
     unsigned int   insize;
@@ -1654,6 +1672,12 @@
 @END
 
 
+/* flags in create_named_pipe and get_named_pipe_info */
+#define NAMED_PIPE_MESSAGE_STREAM_WRITE 0x0001
+#define NAMED_PIPE_MESSAGE_STREAM_READ  0x0002
+#define NAMED_PIPE_NONBLOCKING_MODE     0x0004
+
+
 /* Open an existing named pipe */
 @REQ(open_named_pipe)
     unsigned int   access;
Index: server/serial.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/serial.c,v
retrieving revision 1.38
diff -u -u -r1.38 serial.c
--- server/serial.c	4 Mar 2005 12:38:37 -0000	1.38
+++ server/serial.c	19 Mar 2005 17:29:08 -0000
@@ -267,7 +267,7 @@
         return;
     }
 
-    if (!create_async( fd, current, timeout, queue, apc, user, iosb ))
+    if (!create_async( current, &timeout, queue, apc, user, iosb ))
         return;
 
     /* Check if the new pending request can be served immediately */
Index: server/sock.c
===================================================================
RCS file: /home/cvs/cvsroot/wine/wine/server/sock.c,v
retrieving revision 1.52
diff -u -u -r1.52 sock.c
--- server/sock.c	24 Feb 2005 17:06:31 -0000	1.52
+++ server/sock.c	19 Mar 2005 17:27:41 -0000
@@ -528,7 +528,7 @@
     }
     else
     {
-        if (!create_async( fd, current, 0, queue, apc, user, iosb ))
+        if (!create_async( current, 0, queue, apc, user, iosb ))
             return;
     }
 

Reply via email to