The previous version of the completion channel was racy and would
occasionally lose events, resulting in users blocking indefinitely
if no new events occurred.  The most sure fix for this is to add
a thread to the completion manager that reaps events from an IO
completion port and dispatches them to the correct completion
channel.  This results in a 1-2% performance hit in libibverbs
bandwidth tests that wait on CQ, but actually works.

Signed-off-by: Sean Hefty <[email protected]>
---
As a reminder, the completion manager / channel abstraction is used to
simulate the select/poll functionality across multiple FDs on linux.

diff -up -r -X \mshefty\scm\winof\trunk\docs\dontdiff.txt -I '\$Id:' 
trunk\etc/user/comp_channel.cpp
branches\winverbs\etc/user/comp_channel.cpp
--- trunk\etc/user/comp_channel.cpp     2009-03-10 02:11:36.546875000 -0700
+++ branches\winverbs\etc/user/comp_channel.cpp 2009-04-10 11:57:38.534233100 
-0700
@@ -28,29 +28,88 @@
  */
 
 #include <comp_channel.h>
+#include <process.h>
 
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry);
 static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
 
+
+/*
+ * Completion manager
+ */
+
+static unsigned __stdcall CompThreadPoll(void *Context)
+{
+       COMP_MANAGER *mgr = (COMP_MANAGER *) Context;
+       COMP_ENTRY *entry;
+       OVERLAPPED *overlap;
+       DWORD bytes;
+       ULONG_PTR key;
+
+       while (mgr->Run) {
+               GetQueuedCompletionStatus(mgr->CompQueue, &bytes, &key,
+                                                                 &overlap, 
INFINITE);
+               entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
+
+               if (entry->Channel) {
+                       CompChannelQueue(entry->Channel, entry);
+               } else {
+                       CompManagerQueue(mgr, entry);
+               }
+       }
+
+       _endthreadex(0);
+       return 0;
+}
+
 DWORD CompManagerOpen(COMP_MANAGER *pMgr)
 {
+       DWORD ret;
+
+       InitializeCriticalSection(&pMgr->Lock);
+       pMgr->Busy = 0;
+       DListInit(&pMgr->DoneList);
+       CompEntryInit(NULL, &pMgr->Entry);
+
        pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 
-1);
        if (pMgr->CompQueue == NULL) {
-               return GetLastError();
+               ret = GetLastError();
+               goto err1;
        }
 
        pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
        if (pMgr->Event == NULL) {
-               return GetLastError();
+               ret = GetLastError();
+               goto err2;
        }
 
-       pMgr->Lock = 0;
+       pMgr->Run = TRUE;
+       pMgr->Thread = (HANDLE) _beginthreadex(NULL, 0, CompThreadPoll, pMgr, 
0, NULL);
+       if (pMgr->Thread == NULL) {
+               ret = GetLastError();
+               goto err3;
+       }
        return 0;
+
+err3:
+       CloseHandle(pMgr->Event);
+err2:
+       CloseHandle(pMgr->CompQueue);
+err1:
+       DeleteCriticalSection(&pMgr->Lock);     
+       return ret;
 }
 
 void CompManagerClose(COMP_MANAGER *pMgr)
 {
+       pMgr->Run = FALSE;
+       CompManagerCancel(pMgr);
+       WaitForSingleObject(pMgr->Thread, INFINITE);
+       CloseHandle(pMgr->Thread);
+
        CloseHandle(pMgr->CompQueue);
        CloseHandle(pMgr->Event);
+       DeleteCriticalSection(&pMgr->Lock);     
 }
 
 DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)
@@ -61,38 +120,85 @@ DWORD CompManagerMonitor(COMP_MANAGER *p
        return (cq == NULL) ? GetLastError() : 0;
 }
 
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)
+{
+       EnterCriticalSection(&pMgr->Lock);
+       DListInsertTail(&pEntry->MgrEntry, &pMgr->DoneList);
+       SetEvent(pMgr->Event);
+       LeaveCriticalSection(&pMgr->Lock);
+}
+
+static void CompManagerRemoveEntry(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)
+{
+       EnterCriticalSection(&pMgr->Lock);
+       DListRemove(&pEntry->MgrEntry);
+       LeaveCriticalSection(&pMgr->Lock);
+}
+
 DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
                                          COMP_CHANNEL **ppChannel)
 {
        COMP_ENTRY *entry;
-       OVERLAPPED *overlap;
-       DWORD bytes, ret;
-       ULONG_PTR key;
+       DWORD ret = 0;
 
-       if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,
-                                                                 
Milliseconds)) {
-               entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);
-               *ppChannel = entry->Channel;
-               CompChannelQueue(entry->Channel, entry);
-               ret = 0;
-       } else {
-               ret = GetLastError();
+       EnterCriticalSection(&pMgr->Lock);
+       while (DListEmpty(&pMgr->DoneList)) {
+               ResetEvent(pMgr->Event);
+               LeaveCriticalSection(&pMgr->Lock);
+       
+               ret = WaitForSingleObject(pMgr->Event, Milliseconds);
+               if (ret) {
+                       return ret;
+               }
+
+               EnterCriticalSection(&pMgr->Lock);
        }
+
+       entry = CONTAINING_RECORD(pMgr->DoneList.Next, COMP_ENTRY, MgrEntry);
+       *ppChannel = entry->Channel;
+       if (entry->Channel == NULL) {
+               DListRemove(&entry->MgrEntry);
+               InterlockedExchange(&entry->Busy, 0);
+               ret = ERROR_CANCELLED;
+       }
+       LeaveCriticalSection(&pMgr->Lock);
+
        return ret;
 }
 
+void CompManagerCancel(COMP_MANAGER *pMgr)
+{
+       if (InterlockedCompareExchange(&pMgr->Entry.Busy, 1, 0) == 0) {
+               PostQueuedCompletionStatus(pMgr->CompQueue, 0, (ULONG_PTR) pMgr,
+                                                                  
&pMgr->Entry.Overlap);
+       }
+}
+
 
-void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD 
Milliseconds)
+/*
+ * Completion channel
+ */
+
+DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD 
Milliseconds)
 {
        pChannel->Manager = pMgr;
        pChannel->Head = NULL;
        pChannel->TailPtr = &pChannel->Head;
-       InitializeCriticalSection(&pChannel->Lock);
        pChannel->Milliseconds = Milliseconds;
+
+       pChannel->Event = CreateEvent(NULL, TRUE, TRUE, NULL);
+       if (pChannel->Event == NULL) {
+               return GetLastError();
+       }
+
+       InitializeCriticalSection(&pChannel->Lock);
+       CompEntryInit(pChannel, &pChannel->Entry);
+       return 0;
 }
 
 void CompChannelCleanup(COMP_CHANNEL *pChannel)
 {
+       CloseHandle(pChannel->Event);
        DeleteCriticalSection(&pChannel->Lock); 
 }
 
@@ -114,84 +220,101 @@ static COMP_ENTRY *CompChannelRemoveHead
        return entry;
 }
 
+static COMP_ENTRY *CompChannelFindRemove(COMP_CHANNEL *pChannel, COMP_ENTRY 
*pEntry)
+{
+       COMP_ENTRY **entry_ptr, *entry;
+
+       EnterCriticalSection(&pChannel->Lock);
+       entry_ptr = &pChannel->Head;
+       while (*entry_ptr && *entry_ptr != pEntry) {
+               entry_ptr = &(*entry_ptr)->Next;
+       }
+
+       if (*entry_ptr != NULL) {
+               *entry_ptr = pEntry->Next;
+               if (pChannel->TailPtr == &pEntry->Next) {
+                       pChannel->TailPtr = entry_ptr;
+               }
+               CompManagerRemoveEntry(pChannel->Manager, pEntry);
+               InterlockedExchange(&pEntry->Busy, 0);
+       }
+       LeaveCriticalSection(&pChannel->Lock);
+       return *entry_ptr;
+}
+
 static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
 {
        pEntry->Next = NULL;
        EnterCriticalSection(&pChannel->Lock);
+       CompManagerQueue(pChannel->Manager, pEntry);
        CompChannelInsertTail(pChannel, pEntry);
+       SetEvent(pChannel->Event);
        LeaveCriticalSection(&pChannel->Lock);
 }
 
 DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)
 {
-       COMP_MANAGER *mgr = pChannel->Manager;
-       COMP_CHANNEL *chan;
-       DWORD ret = 0;
-       ULONG locked;
+       COMP_ENTRY *entry;
+       DWORD ret;
 
        EnterCriticalSection(&pChannel->Lock);
        while (pChannel->Head == NULL) {
+               ResetEvent(pChannel->Event);
                LeaveCriticalSection(&pChannel->Lock);
 
-               locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);
-               if (locked == 0) {
-                       ResetEvent(mgr->Event);
-                       ret = CompManagerPoll(mgr, pChannel->Milliseconds, 
&chan);
-                       InterlockedExchange(&mgr->Lock, 0);
-                       SetEvent(mgr->Event);
-               } else {
-                       ret = WaitForSingleObject(mgr->Event, 
pChannel->Milliseconds);
-               }
+               ret = WaitForSingleObject(pChannel->Event, 
pChannel->Milliseconds);
                if (ret) {
-                       goto out;
+                       return ret;
                }
 
                EnterCriticalSection(&pChannel->Lock);
        }
-       *ppEntry = CompChannelRemoveHead(pChannel);
+       entry = CompChannelRemoveHead(pChannel);
+       CompManagerRemoveEntry(pChannel->Manager, entry);
        LeaveCriticalSection(&pChannel->Lock);
 
-out:
+       InterlockedExchange(&entry->Busy, 0);
+       *ppEntry = entry;
+       ret = (entry == &pChannel->Entry) ? ERROR_CANCELLED : 0;
+
        return ret;
 }
 
-void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
+void CompChannelCancel(COMP_CHANNEL *pChannel)
 {
-       COMP_CHANNEL *chan;
-       COMP_ENTRY **entry_ptr;
-       DWORD ret;
-
-       do {
-               ret = CompManagerPoll(pChannel->Manager, 0, &chan);
-       } while (!ret);
-       SetEvent(pChannel->Manager->Event);
-
-       EnterCriticalSection(&pChannel->Lock);
-       entry_ptr = &pChannel->Head;
-       while (*entry_ptr && *entry_ptr != pEntry) {
-               entry_ptr = &(*entry_ptr)->Next;
+       if (InterlockedCompareExchange(&pChannel->Entry.Busy, 1, 0) == 0) {
+               PostQueuedCompletionStatus(pChannel->Manager->CompQueue, 0,
+                                                                  (ULONG_PTR) 
pChannel, &pChannel->Entry.Overlap);
        }
-
-       if (*entry_ptr != NULL) {
-               *entry_ptr = pEntry->Next;
-               if (pChannel->TailPtr == &pEntry->Next) {
-                       pChannel->TailPtr = entry_ptr;
-               }
-       }
-       LeaveCriticalSection(&pChannel->Lock);
 }
 
+
+/*
+ * Completion entry
+ */
+
 void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)
 {
+       RtlZeroMemory(pEntry, sizeof *pEntry);
        pEntry->Channel = pChannel;
 }
 
 DWORD CompEntryPost(COMP_ENTRY *pEntry)
 {
-       if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 
0,
-                                                                  
&pEntry->Overlap)) {
-               return 0;
-       } else {
-               return GetLastError();
+       if (InterlockedCompareExchange(&pEntry->Busy, 1, 0) == 0) {
+               if 
(!PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue,
+                                                                               
0, 0, &pEntry->Overlap)) {
+                       InterlockedExchange(&pEntry->Busy, 0);
+                       return GetLastError();
+               }
+       }
+       return 0;
+}
+
+void CompEntryCancel(COMP_ENTRY *pEntry)
+{
+       while (pEntry->Busy) {
+               Sleep(0);
+               CompChannelFindRemove(pEntry->Channel, pEntry);
        }
 }
--- trunk\inc\user\comp_channel.h       2009-03-10 02:09:09.765625000 -0700
+++ branches\winverbs\inc\user\comp_channel.h   2009-04-09 17:43:01.515373300 
-0700
@@ -33,6 +33,7 @@
 #define COMP_CHANNEL_H
 
 #include <windows.h>
+#include <dlist.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -41,8 +42,10 @@ extern "C" {
 typedef struct _COMP_ENTRY
 {
        struct _COMP_ENTRY              *Next;
+       DLIST_ENTRY                             MgrEntry;
        OVERLAPPED                              Overlap;
        struct _COMP_CHANNEL    *Channel;
+       LONG volatile                   Busy;
 
 }      COMP_ENTRY;
 
@@ -51,6 +54,8 @@ typedef struct _COMP_CHANNEL
        struct _COMP_MANAGER    *Manager;
        COMP_ENTRY                              *Head;
        COMP_ENTRY                              **TailPtr;
+       COMP_ENTRY                              Entry;
+       HANDLE                                  Event;
        CRITICAL_SECTION                Lock;
        DWORD                                   Milliseconds;
 
@@ -59,8 +64,13 @@ typedef struct _COMP_CHANNEL
 typedef struct _COMP_MANAGER
 {
        HANDLE                                  CompQueue;
+       DLIST_ENTRY                             DoneList;
+       COMP_ENTRY                              Entry;
+       HANDLE                                  Thread;
+       BOOL                                    Run;
        HANDLE                                  Event;
-       LONG volatile                   Lock;
+       LONG volatile                   Busy;
+       CRITICAL_SECTION                Lock;
 
 }      COMP_MANAGER;
 
@@ -69,15 +79,17 @@ void                CompManagerClose(COMP_MANAGER *pMg
 DWORD          CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR 
Key);
 DWORD          CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,
                                                        COMP_CHANNEL 
**ppChannel);
+void           CompManagerCancel(COMP_MANAGER *pMgr);
 
-void           CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
+DWORD          CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,
                                                        DWORD Milliseconds);
 void           CompChannelCleanup(COMP_CHANNEL *pChannel);
 DWORD          CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry);
-void           CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY 
*pEntry);
+void           CompChannelCancel(COMP_CHANNEL *pChannel);
 
 void           CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);
 DWORD          CompEntryPost(COMP_ENTRY *pEntry);
+void           CompEntryCancel(COMP_ENTRY *pEntry);
 
 #ifdef __cplusplus
 }

_______________________________________________
ofw mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/ofw

Reply via email to