James,

Here is version 2 with the changes you requested. Also, README updated per 
Hal's comments.

 dapl/udapl/dapl_evd_wait.c
 dapl/udapl/Makefile
 dapl/common/dapl_evd_resize.c
 dapl/openib/TODO
 dapl/openib/dapl_ib_util.c
 dapl/openib/dapl_ib_cm.c
 dapl/openib/dapl_ib_util.h
 dapl/openib/README
 dapl/openib/dapl_ib_cq.c

Signed-off by: Arlin Davis <[EMAIL PROTECTED]>

Index: dapl/udapl/dapl_evd_wait.c
===================================================================
--- dapl/udapl/dapl_evd_wait.c  (revision 2919)
+++ dapl/udapl/dapl_evd_wait.c  (working copy)
@@ -74,9 +74,10 @@
     DAPL_EVD           *evd_ptr;
     DAT_RETURN         dat_status;
     DAT_EVENT          *local_event;
-    DAT_BOOLEAN                notify_requested = DAT_FALSE;
+    DAT_BOOLEAN                notify_needed = DAT_FALSE;
     DAT_BOOLEAN                waitable;
     DAPL_EVD_STATE     evd_state;
+    DAT_COUNT          total_events,new_events;
 
     dapl_dbg_log (DAPL_DBG_TYPE_API,
                  "dapl_evd_wait (%p, %d, %d, %p, %p)\n", 
@@ -124,9 +125,9 @@
     }
 
     dapl_dbg_log (DAPL_DBG_TYPE_EVD, 
-                 "dapl_evd_wait: EVD %p, CQ %p\n", 
-                  evd_ptr,
-                 (void *)evd_ptr->ib_cq_handle);
+                 "dapl_evd_wait: EVD %p, CQ %p, Timeout %d, Threshold %d\n", 
+              evd_ptr,(void *)evd_ptr->ib_cq_handle, time_out, threshold);
+  
 
     /*
      * Make sure there are no other waiters and the evd is active.
@@ -144,11 +145,10 @@
     evd_state = dapl_os_atomic_assign ( (DAPL_ATOMIC *)&evd_ptr->evd_state,
                                        (DAT_COUNT) DAPL_EVD_STATE_OPEN,
                                        (DAT_COUNT) DAPL_EVD_STATE_WAITED );
-    dapl_os_unlock ( &evd_ptr->header.lock );
 
-    if ( evd_state != DAPL_EVD_STATE_OPEN )
+    dapl_os_unlock ( &evd_ptr->header.lock );
+    if ( evd_state != DAPL_EVD_STATE_OPEN || !waitable)
     {
-       /* Bogus state, bail out */
        dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
        goto bail;
     }
@@ -182,37 +182,54 @@
         * return right away if the ib_cq_handle associate with these evd
         * equal to IB_INVALID_HANDLE
         */
-       dapls_evd_copy_cq(evd_ptr);
-
-       if (dapls_rbuf_count(&evd_ptr->pending_event_queue) >= threshold)
-       {
-           break;
-       }
-
-       /*
-        * Do not enable the completion notification if this evd is not 
-        * a DTO_EVD or RMR_BIND_EVD
+       /* Logic to prevent missing completion between copy_cq (poll)
+        * and completion_notify (re-arm)  
         */
-       if ( (!notify_requested) &&
-             ((evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
-              (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG)) )
+       notify_needed = DAT_TRUE;
+       new_events = 0;
+       while (DAT_TRUE)
        {
-           dat_status = dapls_ib_completion_notify (
-               evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
-               evd_ptr,
-               (evd_ptr->completion_type == DAPL_EVD_STATE_SOLICITED_WAIT) ?
-                    IB_NOTIFY_ON_SOLIC_COMP : IB_NOTIFY_ON_NEXT_COMP );  
-
-           DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
-           /* FIXME report error */
-           dapl_os_assert(dat_status == DAT_SUCCESS);
+               dapls_evd_copy_cq(evd_ptr); /* poll for new completions */
+               total_events = dapls_rbuf_count 
(&evd_ptr->pending_event_queue); 
+               new_events = total_events - new_events;
+               if (total_events >= threshold ||
+                       (!new_events && notify_needed == DAT_FALSE))
+               {
+                       break;
+               } 
+                                                                               
        
+               /*
+                * Do not enable the completion notification if this evd is not 
+                * a DTO_EVD or RMR_BIND_EVD
+                */
+               if ( (evd_ptr->evd_flags & DAT_EVD_DTO_FLAG) ||
+                       (evd_ptr->evd_flags & DAT_EVD_RMR_BIND_FLAG) )
+               {
+                       dat_status = dapls_ib_completion_notify (
+                                       
evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+                                       evd_ptr,
+                                       (evd_ptr->completion_type == 
DAPL_EVD_STATE_SOLICITED_WAIT)
?
+                                       IB_NOTIFY_ON_SOLIC_COMP : 
IB_NOTIFY_ON_NEXT_COMP );  
+
+                       DAPL_CNTR(DCNT_EVD_WAIT_CMP_NTFY);
+                       notify_needed = DAT_FALSE;
+                       new_events = total_events;
+                       
+                       /* FIXME report error */
+                       dapl_os_assert(dat_status == DAT_SUCCESS);
+               } 
+               else 
+               {
+                       break;
+               }
 
-           notify_requested = DAT_TRUE;
+       } /* while completions < threshold, and rearm needed */
 
-           /* Try again.  */
-           continue;
+       if (total_events >= threshold)
+       {
+               break;
        }
-
+       
 
        /*
         * Unused by poster; it has no way to tell how many
@@ -232,8 +249,6 @@
 #endif
                dat_status = dapl_os_wait_object_wait (
                                &evd_ptr->wait_object, time_out );
-       
-       notify_requested = DAT_FALSE; /* We've used it up.  */
 
        /* See if we were awakened by evd_set_unwaitable */
        if ( !evd_ptr->evd_waitable )
@@ -243,13 +258,22 @@
 
        if (dat_status != DAT_SUCCESS)
        {
-           /*
-            * If the status is DAT_TIMEOUT, we'll break out of the
-            * loop, *not* dequeue an event (because dat_status
-            * != DAT_SUCCESS), set *nmore (as we should for timeout)
-            * and return DAT_TIMEOUT.
-            */
-           break;
+               /*
+                * If the status is DAT_TIMEOUT, we'll break out of the
+                * loop, *not* dequeue an event (because dat_status
+                * != DAT_SUCCESS), set *nmore (as we should for timeout)
+                * and return DAT_TIMEOUT.
+                */
+
+#if defined(DAPL_DBG)
+               dapls_evd_copy_cq(evd_ptr); /* poll */
+               dapl_dbg_log (DAPL_DBG_TYPE_EVD, 
+                       "dapl_evd_wait: WAKEUP ERROR (0x%x): EVD %p, CQ %p, 
events? %d\n", 
+                       dat_status,evd_ptr,(void *)evd_ptr->ib_cq_handle, 
+                       dapls_rbuf_count(&evd_ptr->pending_event_queue) );
+#endif /* DAPL_DBG */
+
+               break;
        }
     }
            
Index: dapl/udapl/Makefile
===================================================================
--- dapl/udapl/Makefile (revision 2941)
+++ dapl/udapl/Makefile (working copy)
@@ -122,7 +122,8 @@
 #
 ifeq ($(VERBS),openib)
 PROVIDER = $(TOPDIR)/../openib
-CFLAGS   += -DOPENIB -DCQ_WAIT_OBJECT
+CFLAGS   += -DOPENIB 
+#CFLAGS   += -DCQ_WAIT_OBJECT uncomment when fixed
 CFLAGS   += -I/usr/local/include/infiniband
 endif
 
Index: dapl/common/dapl_evd_resize.c
===================================================================
--- dapl/common/dapl_evd_resize.c       (revision 2919)
+++ dapl/common/dapl_evd_resize.c       (working copy)
@@ -67,71 +67,139 @@
        IN      DAT_EVD_HANDLE     evd_handle,
        IN      DAT_COUNT          evd_qlen )
 {
-    DAPL_IA            *ia_ptr;
-    DAPL_EVD           *evd_ptr;
-    DAT_COUNT          pend_cnt;
-    DAT_RETURN         dat_status;
+    DAPL_IA          *ia_ptr;
+    DAPL_EVD         *evd_ptr;
+    DAT_EVENT        *event_ptr;
+    DAT_EVENT        *events;
+    DAT_EVENT        *orig_event;
+    DAPL_RING_BUFFER free_event_queue;
+    DAPL_RING_BUFFER pending_event_queue;
+    DAT_COUNT        pend_cnt;
+    DAT_COUNT        i;
+    DAT_RETURN       dat_status;
 
     dapl_dbg_log (DAPL_DBG_TYPE_API, "dapl_evd_resize (%p, %d)\n",
                  evd_handle, evd_qlen);
 
     if (DAPL_BAD_HANDLE (evd_handle, DAPL_MAGIC_EVD))
     {
-       dat_status = DAT_ERROR (DAT_INVALID_HANDLE,0);
-       goto bail;
+        return DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG1);
     }
 
     evd_ptr     = (DAPL_EVD *)evd_handle;
     ia_ptr      = evd_ptr->header.owner_ia;
 
-    if ( evd_qlen == evd_ptr->qlen )
+    if ((evd_qlen <= 0) || (evd_ptr->qlen > evd_qlen))
     {
-        dat_status = DAT_SUCCESS;
-        goto bail;
+        dat_status = DAT_ERROR(DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
+       goto bail;
     }
 
     if ( evd_qlen > ia_ptr->hca_ptr->ia_attr.max_evd_qlen )
     {
-       dat_status = DAT_ERROR (DAT_INVALID_PARAMETER,DAT_INVALID_ARG2);
+       dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_TEVD);
        goto bail;
     }
 
     dapl_os_lock(&evd_ptr->header.lock);
 
-    /* Don't try to resize if we are actively waiting */
     if (evd_ptr->evd_state == DAPL_EVD_STATE_WAITED)
     {
-        dapl_os_unlock(&evd_ptr->header.lock);
-       dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
-       goto bail;
+        dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
+        goto bail_unlock;
     }
 
     pend_cnt = dapls_rbuf_count(&evd_ptr->pending_event_queue);
     if (pend_cnt > evd_qlen) {
-       dapl_os_unlock(&evd_ptr->header.lock);
-       dat_status = DAT_ERROR (DAT_INVALID_STATE,0);
-       goto bail;
+        dat_status = DAT_ERROR(DAT_INVALID_STATE,0);
+        goto bail_unlock;
     }
 
     dat_status = dapls_ib_cq_resize(evd_ptr->header.owner_ia,
-                                   evd_ptr,
-                                   &evd_qlen);
-    if (dat_status != DAT_SUCCESS)
+                                    evd_ptr,
+                                    &evd_qlen); 
+    if (DAT_SUCCESS != dat_status) {
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
+    }
+
+    /* Allocate EVENTs */
+    events = (DAT_EVENT *) dapl_os_alloc (evd_qlen * sizeof (DAT_EVENT));
+    if (!events)
     {
-        dapl_os_unlock(&evd_ptr->header.lock);
-       goto bail;
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
     }
+    event_ptr = events;
 
-    dat_status = dapls_evd_event_realloc (evd_ptr, evd_qlen);
-    if (dat_status != DAT_SUCCESS)
+    /* allocate free event queue */
+    dat_status = dapls_rbuf_alloc (&free_event_queue, evd_qlen);
+    if (DAT_SUCCESS != dat_status)
     {
-        dapl_os_unlock(&evd_ptr->header.lock);
-       goto bail;
+        dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
+    }
+
+    /* allocate pending event queue */
+    dat_status = dapls_rbuf_alloc (&pending_event_queue, evd_qlen);
+    if (DAT_SUCCESS != dat_status)
+    {
+        dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+        dat_status = DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+        goto bail_unlock;
     }
 
+    for (i = 0; i < pend_cnt; i++) 
+    {
+        orig_event = dapls_rbuf_remove(&evd_ptr->pending_event_queue);
+        if (orig_event == NULL) {
+            dapl_dbg_log (DAPL_DBG_TYPE_ERR, " Inconsistent event queue\n");
+            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+           dat_status = 
DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+           goto bail_unlock;
+        }
+        memcpy(event_ptr, orig_event, sizeof(DAT_EVENT));
+        dat_status = dapls_rbuf_add(&pending_event_queue, event_ptr);
+        if (DAT_SUCCESS != dat_status) {
+            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+           dat_status = 
DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+           goto bail_unlock;
+        }
+        event_ptr++;
+    }
+
+    for (i = pend_cnt; i < evd_qlen; i++)
+    {
+        dat_status = dapls_rbuf_add(&free_event_queue,(void *) event_ptr);
+        if (DAT_SUCCESS != dat_status) {
+            dapl_os_free(event_ptr, evd_qlen * sizeof (DAT_EVENT));
+           dat_status = 
DAT_ERROR(DAT_INSUFFICIENT_RESOURCES,DAT_RESOURCE_MEMORY);
+           goto bail_unlock;
+        }
+        event_ptr++;
+    }
+
+    dapls_rbuf_destroy (&evd_ptr->free_event_queue);
+    dapls_rbuf_destroy (&evd_ptr->pending_event_queue);
+    if (evd_ptr->events)
+    {
+        dapl_os_free (evd_ptr->events, evd_ptr->qlen * sizeof (DAT_EVENT));
+    }
+    evd_ptr->free_event_queue    = free_event_queue;
+    evd_ptr->pending_event_queue = pending_event_queue;
+    evd_ptr->events              = events;
+    evd_ptr->qlen                = evd_qlen;
+
+bail_unlock:
+
     dapl_os_unlock(&evd_ptr->header.lock);
 
- bail:
+    dapl_dbg_log (DAPL_DBG_TYPE_RTN,
+                 "dapl_evd_resize returns 0x%x\n",dat_status);
+
+bail:
+
     return dat_status;
 }
 
Index: dapl/openib/TODO
===================================================================
--- dapl/openib/TODO    (revision 2919)
+++ dapl/openib/TODO    (working copy)
@@ -1,7 +1,7 @@
 
 IB Verbs:
 - CQ resize?
-- query call to get current qp state 
+- query call to get current qp state, remote port number 
 - ibv_get_cq_event() needs timed event call and wakeup
 - query call to get device attributes
 - memory window support
@@ -9,8 +9,6 @@
 DAPL:
 - reinit EP needs a QP timewait completion notification
 - add cq_object wakeup, time based cq_object wait when verbs support arrives
-- update uDAPL code with real ATS support
-- etc, etc.
 
 Other:
 - Shared memory in udapl and kernel module to support?
Index: dapl/openib/dapl_ib_util.c
===================================================================
--- dapl/openib/dapl_ib_util.c  (revision 2919)
+++ dapl/openib/dapl_ib_util.c  (working copy)
@@ -111,27 +111,40 @@
 }
 
 
-/* just get IP address for hostname */
-int dapli_get_addr( char *addr, int addr_len)
+/* just get IP address, IPv4 only for now  */
+int dapli_get_hca_addr( struct dapl_hca *hca_ptr )
 {
-       struct sockaddr_in      *ipv4_addr = (struct sockaddr_in*)addr;
-       struct hostent          *h_ptr;
-       struct utsname          ourname;
-
-       if ( uname( &ourname ) < 0 ) 
-               return 1;
-
-       h_ptr = gethostbyname( ourname.nodename );
-       if ( h_ptr == NULL ) 
+       struct sockaddr_in      *ipv4_addr;
+       struct ib_at_completion at_comp;        
+       struct dapl_at_record   at_rec;
+       int                     status;
+       DAT_RETURN              dat_status;
+       
+       ipv4_addr = (struct sockaddr_in*)&hca_ptr->hca_address;
+       ipv4_addr->sin_family = AF_INET;
+       ipv4_addr->sin_addr.s_addr = 0;
+       
+       at_comp.fn = dapli_ip_comp_handler;
+       at_comp.context = &at_rec; 
+       at_rec.addr = &hca_ptr->hca_address;
+       at_rec.wait_object = &hca_ptr->ib_trans.wait_object;
+
+       /*  call with async_comp until the sync version works */
+       status = ib_at_ips_by_gid(&hca_ptr->ib_trans.gid, 
&ipv4_addr->sin_addr.s_addr, 1, 
+                                 &at_comp, &at_rec.req_id);
+       
+       if (status < 0) 
                return 1;
-
-       if ( h_ptr->h_addrtype == AF_INET ) {
-               ipv4_addr = (struct sockaddr_in*) addr;
-               ipv4_addr->sin_family = AF_INET;
-               dapl_os_memcpy( &ipv4_addr->sin_addr, h_ptr->h_addr_list[0], 4 
);
-       } else 
+ 
+        if (status > 0)
+                dapli_ip_comp_handler(at_rec.req_id, (void*)ipv4_addr, status);
+       
+       /* wait for answer, 5 seconds max */
+       dat_status = dapl_os_wait_object_wait 
(&hca_ptr->ib_trans.wait_object,5000000);
+        
+       if ((dat_status != DAT_SUCCESS ) || (!ipv4_addr->sin_addr.s_addr)) 
                return 1;
-
+               
        return 0;
 }
 
@@ -152,14 +165,17 @@
  */
 int32_t dapls_ib_init (void)
 {      
-       if (dapli_cm_thread_init())
-               return -1;
-       else
-               return 0;
+       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_init: \n" );
+       if (dapli_cm_thread_init() || dapli_at_thread_init()) 
+               return 1;
+
+       return 0;
 }
 
 int32_t dapls_ib_release (void)
 {
+       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " dapl_ib_release: \n" );
+       dapli_at_thread_destroy();
        dapli_cm_thread_destroy();
        return 0;
 }
@@ -186,7 +202,6 @@
         IN   DAPL_HCA          *hca_ptr)
 {
        struct dlist    *dev_list;
-       DAT_RETURN      dat_status = DAT_SUCCESS;
 
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
                      " open_hca: %s - %p\n", hca_name, hca_ptr );
@@ -217,36 +232,46 @@
                              ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
                return DAT_INTERNAL_ERROR;
        }
-  
+
        /* set inline max with enviromment or default, get local lid and gid 0 
*/
        hca_ptr->ib_trans.max_inline_send = 
                dapl_os_get_env_val ( "DAPL_MAX_INLINE", INLINE_SEND_DEFAULT );
 
-       if ( dapli_get_lid(hca_ptr, hca_ptr->port_num,
-                          &hca_ptr->ib_trans.lid )) {
+       if (dapli_get_lid(hca_ptr, hca_ptr->port_num,
+                          &hca_ptr->ib_trans.lid)) {
                dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
                              " open_hca: IB get LID failed for %s\n", 
                              ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-               return DAT_INTERNAL_ERROR;
+               goto bail;
        }
 
-       if ( dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,  
-                          &hca_ptr->ib_trans.gid )) {
+       if (dapli_get_gid(hca_ptr, hca_ptr->port_num, 0,  
+                          &hca_ptr->ib_trans.gid)) {
                dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
                              " open_hca: IB get GID failed for %s\n", 
                              ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-               return DAT_INTERNAL_ERROR;
+               goto bail;
        }
-
        /* get the IP address of the device */
-       if ( dapli_get_addr((char*)&hca_ptr->hca_address, 
-                           sizeof(DAT_SOCK_ADDR6) )) {
+       if (dapli_get_hca_addr(hca_ptr)) {
                dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
                              " open_hca: IB get ADDR failed for %s\n", 
                              ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
-               return DAT_INTERNAL_ERROR;
+               goto bail;
+       }
+
+       /* one thread for each device open */
+       if (dapli_cq_thread_init(hca_ptr)) {
+               dapl_dbg_log (DAPL_DBG_TYPE_ERR, 
+                             " open_hca: cq_thread_init failed for %s\n", 
+                             ibv_get_device_name(hca_ptr->ib_trans.ib_dev) );
+               goto bail;
        }
 
+       /* initialize cq_lock and wait object */
+       dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock);
+       dapl_os_wait_object_init (&hca_ptr->ib_trans.wait_object);
+  
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
                      " open_hca: %s, port %d, %s  %d.%d.%d.%d 
INLINE_MAX=%d\n", 
                      ibv_get_device_name(hca_ptr->ib_trans.ib_dev), 
hca_ptr->port_num,
@@ -257,7 +282,19 @@
                      ((struct sockaddr_in 
*)&hca_ptr->hca_address)->sin_addr.s_addr >> 24 & 0xff,
                      hca_ptr->ib_trans.max_inline_send );
 
-       return dat_status;
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                    " open_hca: LID 0x%x GID subnet %016llx id %016llx\n",
+                    hca_ptr->ib_trans.lid,
+                    (unsigned long 
long)bswap_64(hca_ptr->ib_trans.gid.global.subnet_prefix),
+                    (unsigned long 
long)bswap_64(hca_ptr->ib_trans.gid.global.interface_id) );
+
+       return DAT_SUCCESS;
+
+bail:
+       ibv_close_device(hca_ptr->ib_hca_handle); 
+       hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
+       return DAT_INTERNAL_ERROR;
+
 }
 
 
@@ -282,10 +319,14 @@
        dapl_dbg_log (DAPL_DBG_TYPE_UTIL," close_hca: %p\n",hca_ptr);
 
        if (hca_ptr->ib_hca_handle != IB_INVALID_HANDLE) {
+               dapli_cq_thread_destroy(hca_ptr);
                if (ibv_close_device(hca_ptr->ib_hca_handle)) 
                        return(dapl_convert_errno(errno,"ib_close_device"));
                hca_ptr->ib_hca_handle = IB_INVALID_HANDLE;
        }
+       
+       dapl_os_lock_destroy(&hca_ptr->ib_trans.cq_lock);
+
        return (DAT_SUCCESS);
 }
   
@@ -448,35 +489,4 @@
     return DAT_SUCCESS;
 }
 
-#ifdef PROVIDER_SPECIFIC_ATTR
-
-/*
- * dapls_set_provider_specific_attr
- *
- * Input:
- *     attr_ptr                Pointer provider attributes
- *
- * Output:
- *     none
- *
- * Returns:
- *     void
- */
-DAT_NAMED_ATTR ib_attrs[] = {
-    {
-       "I_DAT_SEND_INLINE_THRESHOLD",
-       "128"
-    },
-};
-
-#define SPEC_ATTR_SIZE( x )    (sizeof( x ) / sizeof( DAT_NAMED_ATTR))
-
-void dapls_set_provider_specific_attr(
-       IN DAT_PROVIDER_ATTR    *attr_ptr )
-{
-       attr_ptr->num_provider_specific_attr = SPEC_ATTR_SIZE( ib_attrs );
-       attr_ptr->provider_specific_attr     = ib_attrs;
-}
-
-#endif
 
Index: dapl/openib/dapl_ib_cm.c
===================================================================
--- dapl/openib/dapl_ib_cm.c    (revision 2919)
+++ dapl/openib/dapl_ib_cm.c    (working copy)
@@ -70,19 +70,8 @@
 static inline uint64_t cpu_to_be64(uint64_t x) { return x; }
 #endif
 
-#ifndef IB_AT
-
-#include <stdio.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <netinet/tcp.h>
-#include <sysfs/libsysfs.h>
-#include <signal.h>
-
-/* iclust-20 hard coded values, network order */
-#define REMOTE_GID     "fe80:0000:0000:0000:0002:c902:0000:4071"
-#define REMOTE_LID     "0002"
-
+static int                     g_at_destroy;
+static DAPL_OS_THREAD          g_at_thread;
 static int                     g_cm_destroy;
 static DAPL_OS_THREAD          g_cm_thread;
 static DAPL_OS_LOCK            g_cm_lock;
@@ -122,7 +111,7 @@
        while (g_cm_destroy) {
                struct timespec sleep, remain;
                sleep.tv_sec = 0;
-               sleep.tv_nsec = 200000000; /* 200 ms */
+               sleep.tv_nsec = 10000000; /* 10 ms */
                dapl_dbg_log(DAPL_DBG_TYPE_CM, 
                             " cm_thread_destroy: waiting for cm_thread\n");
                nanosleep (&sleep, &remain);
@@ -130,112 +119,70 @@
        dapl_dbg_log(DAPL_DBG_TYPE_CM," cm_thread_destroy(%d) exit\n",getpid());
 }
 
-static int ib_at_route_by_ip(uint32_t dst_ip, uint32_t src_ip, int tos, 
uint16_t flags,
-                            struct ib_at_ib_route *ib_route,
-                            struct ib_at_completion *async_comp)
-{
-       struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
-       
-       dapl_dbg_log (
-               DAPL_DBG_TYPE_CM, 
-               " CM at_route_by_ip: conn %p cm_id %d src %d.%d.%d.%d -> dst 
%d.%d.%d.%d (%d)\n", 
-               conn,conn->cm_id,
-               src_ip >> 0 & 0xff, src_ip >> 8 & 0xff,
-               src_ip >> 16 & 0xff,src_ip >> 24 & 0xff,
-               dst_ip >> 0 & 0xff, dst_ip >> 8 & 0xff,
-               dst_ip >> 16 & 0xff,dst_ip >> 24 & 0xff, conn->service_id);
-
-       /* use req_id for loopback indication */
-       if (( src_ip == dst_ip ) || ( dst_ip == 0x0100007f ))
-               async_comp->req_id = 1;
-       else
-               async_comp->req_id = 0;
-               
-       return 1;                            
-}
-
-static int ib_at_paths_by_route(struct ib_at_ib_route *ib_route, uint32_t 
mpath_type,
-                               struct ib_sa_path_rec *pr, int npath,
-                               struct ib_at_completion *async_comp)
+int dapli_at_thread_init(void)
 {
-       struct dapl_cm_id *conn = (struct dapl_cm_id *)async_comp->context;
-       char *env, *token;
-       char  dgid[40];
-       uint16_t *p_gid = (uint16_t*)&ib_route->gid;
+       DAT_RETURN dat_status;
 
-       /* set local path record values and send to remote */
-       (void)dapl_os_memzero(pr, sizeof(*pr));
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_init(%d)\n", getpid());
 
-       pr->slid = htons(conn->hca->ib_trans.lid);
-       pr->sgid.global.subnet_prefix = 
conn->hca->ib_trans.gid.global.subnet_prefix;
-       pr->sgid.global.interface_id  = 
conn->hca->ib_trans.gid.global.interface_id;
+       /* create thread to process AT async requests */
+       dat_status = dapl_os_thread_create(at_thread, NULL, &g_at_thread);
+       if (dat_status != DAT_SUCCESS)
+       {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR, 
+                            " at_thread_init: failed to create thread\n");
+               return 1;
+       }
+       return 0;
+}
 
-       env = getenv("DAPL_REMOTE_LID");
-       if ( env == NULL )
-               env = REMOTE_LID;
-       ib_route->lid = strtol(env,NULL,0);
+void dapli_at_thread_destroy(void)
+{
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d)\n", getpid());
 
-       env = getenv("DAPL_REMOTE_GID");
-       if ( env == NULL )
-               env = REMOTE_GID;
+       /* destroy cr_thread and lock */
+       g_at_destroy = 1;
+       pthread_kill( g_at_thread, SIGUSR1 );
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) SIGUSR1 
sent\n",getpid());
+       while (g_at_destroy) {
+               struct timespec sleep, remain;
+               sleep.tv_sec = 0;
+               sleep.tv_nsec = 10000000; /* 10 ms */
+               dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                            " at_thread_destroy: waiting for at_thread\n");
+               nanosleep (&sleep, &remain);
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread_destroy(%d) exit\n",getpid());
+}
 
-       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                    " ib_at_paths_by_route: remote LID %x GID %s\n",
-                    ib_route->lid,env);
+void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num)
+{
+       struct dapl_at_record   *at_rec = context;
 
-       dapl_os_memcpy( dgid, env, 40 ); 
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                    " ip_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
+                    context, req_id, rec_num);
 
-       /* get GID with token strings and delimiter */
-       token = strtok(dgid,":");
-       while (token) {
-               *p_gid = strtoul(token,NULL,16);
-               *p_gid = htons(*p_gid); /* convert each token to network order 
*/
-               token = strtok(NULL,":");
-               p_gid++;
-       }
-       
-       /* set remote lid and gid, req_id is indication of loopback */
-       if ( !async_comp->req_id ) {
-               pr->dlid = htons(ib_route->lid);
-               pr->dgid.global.subnet_prefix = 
ib_route->gid.global.subnet_prefix;
-               pr->dgid.global.interface_id  = 
ib_route->gid.global.interface_id;
-       } else {
-               pr->dlid = pr->slid;
-               pr->dgid.global.subnet_prefix = pr->sgid.global.subnet_prefix;
-               pr->dgid.global.interface_id  = pr->sgid.global.interface_id;
-       }
-
-       pr->reversible = 0x1000000;
-       pr->pkey = 0xffff;
-       pr->mtu  = IBV_MTU_1024;
-       pr->mtu_selector  = 2;
-       pr->rate_selector = 2;
-       pr->rate          = 3;
-       pr->packet_life_time_selector = 2;
-       pr->packet_life_time          = 2;
+       if ((at_rec) && ( at_rec->req_id == req_id)) {
+               dapl_os_wait_object_wakeup(at_rec->wait_object);
+               return;
+       }
        
-       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                    " ib_at_paths_by_route: SRC LID 0x%x GID subnet %016llx id 
%016llx\n",
-                    pr->slid,(unsigned long 
long)(pr->sgid.global.subnet_prefix),
-                    (unsigned long long)(pr->sgid.global.interface_id) );
-       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
-                    " ib_at_paths_by_route: DST LID 0x%x GID subnet %016llx id 
%016llx\n",
-                    pr->dlid,(unsigned long 
long)(pr->dgid.global.subnet_prefix),
-                    (unsigned long long)(pr->dgid.global.interface_id) );
-
-       dapli_path_comp_handler( async_comp->req_id, (void*)conn, 1);
-
-       return 0;
+       dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                    " ip_comp_handler: at_rec->req_id %lld != req_id %lld\n",
+                    at_rec->req_id, req_id );
 }
 
-#endif /* ifndef IB_AT */
-
 static void dapli_path_comp_handler(uint64_t req_id, void *context, int 
rec_num)
 {
        struct dapl_cm_id *conn = context;
        int status;
        ib_cm_events_t event;
 
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                    " path_comp_handler: ctxt %p, req_id %lld rec_num %d\n",
+                    context, req_id, rec_num);
+
        if (rec_num <= 0) {
                dapl_dbg_log(DAPL_DBG_TYPE_CM, 
                             " path_comp_handler: resolution err %d retry %d\n",
@@ -249,7 +196,7 @@
 
                status = ib_at_paths_by_route(&conn->dapl_rt, 0,
                                              &conn->dapl_path, 1,
-                                             &conn->dapl_comp);
+                                             &conn->dapl_comp, 
&conn->dapl_comp.req_id);
                if (status) {
                        dapl_dbg_log(DAPL_DBG_TYPE_CM,
                                     " path_by_route: err %d id %lld\n",
@@ -287,6 +234,21 @@
        int status;
        ib_cm_events_t event;
 
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                    " rt_comp_handler: conn %p, req_id %lld rec_num %d\n",
+                    conn, req_id, rec_num);
+
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                    " rt_comp_handler: SRC GID subnet %016llx id %016llx\n",
+                    (unsigned long 
long)cpu_to_be64(conn->dapl_rt.sgid.global.subnet_prefix),
+                    (unsigned long 
long)cpu_to_be64(conn->dapl_rt.sgid.global.interface_id) );
+
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, 
+                    " rt_comp_handler: DST GID subnet %016llx id %016llx\n",
+                    (unsigned long 
long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
+                    (unsigned long 
long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
+
+
        if (rec_num <= 0) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
                             " dapl_rt_comp_handler: rec %d retry %d\n",
@@ -298,7 +260,8 @@
                }
 
                status = ib_at_route_by_ip(((struct sockaddr_in 
*)&conn->r_addr)->sin_addr.s_addr,
-                                          0, 0, 0, &conn->dapl_rt, 
&conn->dapl_comp);
+                                          0, 0, 0, &conn->dapl_rt, 
+                                          
&conn->dapl_comp,&conn->dapl_comp.req_id);
                if (status < 0) {
                        dapl_dbg_log(DAPL_DBG_TYPE_ERR, "dapl_rt_comp_handler: "
                                    "ib_at_route_by_ip failed with status %d\n",
@@ -306,9 +269,16 @@
                        event = IB_CME_DESTINATION_UNREACHABLE;
                        goto bail;
                }
-
                if (status == 1)
                        dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
+
+               return;
+       }
+
+       if (!conn->dapl_rt.dgid.global.subnet_prefix || req_id != 
conn->dapl_comp.req_id) {
+               dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                            " dapl_rt_comp_handler: ERROR: unexpected callback 
req_id=%d(%d)\n",
+                            req_id, conn->dapl_comp.req_id ); 
                return;
        }
 
@@ -316,7 +286,7 @@
        conn->dapl_comp.context = conn;
        conn->retries = 0;
        status = ib_at_paths_by_route(&conn->dapl_rt, 0, &conn->dapl_path, 1,
-                                     &conn->dapl_comp);
+                                     &conn->dapl_comp, 
&conn->dapl_comp.req_id);
        if (status) {
                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
                             "dapl_rt_comp_handler: ib_at_paths_by_route "
@@ -346,8 +316,6 @@
                ib_cm_destroy_id(conn->cm_id);
                if (conn->ep)
                        conn->ep->cm_handle = IB_INVALID_HANDLE;
-               if (conn->sp)
-                       conn->sp->cm_srvc_handle = IB_INVALID_HANDLE;
 
                /* take off the CM thread work queue and free */
                dapl_os_lock( &g_cm_lock );
@@ -621,10 +589,8 @@
 }
 
 /* something to catch the signal */
-static void cm_handler(int signum)
+static void ib_sig_handler(int signum)
 {
-       dapl_dbg_log (DAPL_DBG_TYPE_CM," cm_thread(%d,0x%x): ENTER cm_handler 
%d\n",
-                       getpid(),g_cm_thread,signum);
        return;
 }
 
@@ -643,7 +609,7 @@
        sigemptyset(&sigset);
        sigaddset(&sigset, SIGUSR1);
        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
-       signal( SIGUSR1, cm_handler); 
+       signal( SIGUSR1, ib_sig_handler); 
        
        dapl_os_lock( &g_cm_lock );
        while (!g_cm_destroy) {
@@ -667,7 +633,7 @@
                dapl_dbg_log(DAPL_DBG_TYPE_CM,
                        " cm_thread: GET EVENT fd=%d n=%d\n",
                        ib_cm_get_fd(),ret);
-               if (ib_cm_event_get(&event)) { 
+               if (ib_cm_event_get_timed(0,&event)) { 
                        dapl_dbg_log(DAPL_DBG_TYPE_CM,
                                " cm_thread: ERR %s eventi_get on %d\n", 
                                strerror(errno), ib_cm_get_fd() );
@@ -732,6 +698,33 @@
        g_cm_destroy = 0;
 }
 
+/* async AT processing thread */
+void at_thread(void *arg) 
+{
+       sigset_t sigset;
+
+       dapl_dbg_log (DAPL_DBG_TYPE_CM,
+                     " at_thread(%d,0x%x): ENTER: at_fd %d\n",
+                     getpid(), g_at_thread, ib_at_get_fd());
+
+       sigemptyset(&sigset);
+       sigaddset(&sigset, SIGUSR1);
+       pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+       signal(SIGUSR1, ib_sig_handler); 
+       
+       while (!g_at_destroy) {
+               /* poll forever until callback or signal */
+               if (ib_at_callback_get_timed(-1) < 0) { 
+                       dapl_dbg_log(DAPL_DBG_TYPE_CM,
+                               " at_thread: SIG? ret=%s, destroy=%d\n", 
+                               strerror(errno), g_at_destroy );
+               }
+               dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread: callback woke\n");
+       }
+       dapl_dbg_log(DAPL_DBG_TYPE_CM," at_thread(%d) EXIT \n", getpid());
+       g_at_destroy = 0;
+}
+
 /************************ DAPL provider entry points **********************/
 
 /*
@@ -826,33 +819,34 @@
        conn->dapl_comp.context = conn;
        conn->retries = 0;
        dapl_os_memcpy(&conn->r_addr, r_addr, sizeof(DAT_SOCK_ADDR6));
+       
+       /* put on CM thread work queue */
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+       dapl_os_lock( &g_cm_lock );
+       dapl_llist_add_tail(&g_cm_list, 
+                           (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+       dapl_os_unlock(&g_cm_lock);
 
        status = ib_at_route_by_ip(
                ((struct sockaddr_in *)&conn->r_addr)->sin_addr.s_addr, 
                ((struct sockaddr_in 
*)&conn->hca->hca_address)->sin_addr.s_addr, 
-               0, 0, &conn->dapl_rt, &conn->dapl_comp);
+               0, 0, &conn->dapl_rt, &conn->dapl_comp, 
&conn->dapl_comp.req_id);
+
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, " connect: at_route ret=%d,%s req_id %d 
GID %016llx
%016llx\n", 
+                    status, strerror(errno), conn->dapl_comp.req_id,
+                    (unsigned long 
long)cpu_to_be64(conn->dapl_rt.dgid.global.subnet_prefix),
+                    (unsigned long 
long)cpu_to_be64(conn->dapl_rt.dgid.global.interface_id) );
 
        if (status < 0) {
                dat_status = dapl_convert_errno(errno,"ib_at_route_by_ip");
-               goto destroy;
+               dapli_destroy_cm_id(conn); 
+               return dat_status;
        }
-       if (status == 1)
-               dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, 1);
 
-       
-       /* put on CM thread work queue */
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-       dapl_os_lock( &g_cm_lock );
-       dapl_llist_add_tail(&g_cm_list, 
-                           (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-       dapl_os_unlock(&g_cm_lock);
+       if (status > 0) 
+               dapli_rt_comp_handler(conn->dapl_comp.req_id, conn, status);
 
        return DAT_SUCCESS;
-
-destroy:
-       dapli_destroy_cm_id(conn); 
-       return dat_status;
-
 }
 
 /*
@@ -992,6 +986,13 @@
        conn->hca = ia_ptr->hca_ptr;
        conn->service_id = ServiceID;
 
+       /* put on CM thread work queue */
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
+       dapl_os_lock( &g_cm_lock );
+       dapl_llist_add_tail(&g_cm_list, 
+                       (DAPL_LLIST_ENTRY*)&conn->entry, conn);
+       dapl_os_unlock(&g_cm_lock);
+
        dapl_dbg_log(DAPL_DBG_TYPE_EP,
                     " setup_listener(conn=%p cm_id=%d)\n",
                     sp_ptr->cm_srvc_handle,conn->cm_id);
@@ -1003,19 +1004,13 @@
                        dat_status = DAT_CONN_QUAL_IN_USE;
                else
                        dat_status = DAT_INSUFFICIENT_RESOURCES;
-       /* success */ 
-       } else  { 
-               /* put on CM thread work queue */
-               dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&conn->entry);
-               dapl_os_lock( &g_cm_lock );
-               dapl_llist_add_tail(&g_cm_list, 
-                               (DAPL_LLIST_ENTRY*)&conn->entry, conn);
-               dapl_os_unlock(&g_cm_lock);
+
+               dapli_destroy_cm_id(conn);
                return dat_status;
        }
 
-        dapli_destroy_cm_id(conn);
-       return dat_status;
+       /* success */ 
+       return DAT_SUCCESS;
 }
 
 
@@ -1047,9 +1042,11 @@
                        " remove_listener(ia_ptr %p sp_ptr %p cm_ptr %p)\n",
                        ia_ptr, sp_ptr, conn );
        
-       if (sp_ptr->cm_srvc_handle != IB_INVALID_HANDLE) 
+       if (conn != IB_INVALID_HANDLE) { 
+               sp_ptr->cm_srvc_handle = NULL;
                dapli_destroy_cm_id(conn);
-       
+       }       
+
        return DAT_SUCCESS;
 }
 
Index: dapl/openib/dapl_ib_util.h
===================================================================
--- dapl/openib/dapl_ib_util.h  (revision 2919)
+++ dapl/openib/dapl_ib_util.h  (working copy)
@@ -53,6 +53,7 @@
 #include <byteswap.h>
 #include <infiniband/sa.h>
 #include <infiniband/cm.h>
+#include <infiniband/at.h>
 
 /* Typedefs to map common DAPL provider types to IB verbs */
 typedef        struct ibv_qp           *ib_qp_handle_t;
@@ -68,8 +69,8 @@
 
 #define IB_RC_RETRY_COUNT      7
 #define IB_RNR_RETRY_COUNT     7
-#define IB_CM_RESPONSE_TIMEOUT 20      /* 4 sec */
-#define IB_MAX_CM_RETRIES      4
+#define IB_CM_RESPONSE_TIMEOUT 18      /* 1 sec */
+#define IB_MAX_CM_RETRIES      7
 
 #define IB_REQ_MRA_TIMEOUT     27      /* a little over 9 minutes */
 #define IB_MAX_AT_RETRY                3
@@ -92,21 +93,12 @@
        IB_CME_BROKEN
 } ib_cm_events_t;
 
-#ifndef IB_AT
-/* implement a quick hack to exchange GID/LID's until user IB_AT arrives */
-struct ib_at_ib_route {
-        union ibv_gid   gid;
-        uint16_t        lid;
+struct dapl_at_record {
+       uint64_t                req_id;
+       DAT_SOCK_ADDR6          *addr;
+       DAPL_OS_WAIT_OBJECT     *wait_object;
 };
 
-struct ib_at_completion {
-        void (*fn)(uint64_t req_id, void *context, int rec_num);
-        void *context;
-        uint64_t req_id;
-};
-
-#endif
-
 /* 
  * dapl_llist_entry in dapl.h but dapl.h depends on provider 
  * typedef's in this file first. move dapl_llist_entry out of dapl.h
@@ -122,6 +114,7 @@
 struct dapl_cm_id {
        struct ib_llist_entry           entry;
        DAPL_OS_LOCK                    lock;
+       DAPL_OS_WAIT_OBJECT             wait_object;
        int                             retries;
        int                             destroy;
        int                             in_callback;
@@ -238,6 +231,10 @@
 { 
        struct  ibv_device      *ib_dev;
        ib_cq_handle_t          ib_cq_empty;
+       DAPL_OS_LOCK            cq_lock;
+       DAPL_OS_WAIT_OBJECT     wait_object;
+       int                     cq_destroy;
+       DAPL_OS_THREAD          cq_thread;
        int                     max_inline_send;
        uint16_t                lid;
        union ibv_gid           gid;
@@ -257,11 +254,18 @@
 void cm_thread (void *arg);
 int dapli_cm_thread_init(void);
 void dapli_cm_thread_destroy(void);
+void at_thread (void *arg);
+int dapli_at_thread_init(void);
+void dapli_at_thread_destroy(void);
+void cq_thread (void *arg);
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr);
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr);
 
-int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid );
+int dapli_get_lid(struct dapl_hca *hca_ptr, int port, uint16_t *lid);
 int dapli_get_gid(struct dapl_hca *hca_ptr, int port, int index, 
-                 union ibv_gid *gid );
-int dapli_get_addr(char *addr, int addr_len);
+                 union ibv_gid *gid);
+int dapli_get_hca_addr(struct dapl_hca *hca_ptr);
+void dapli_ip_comp_handler(uint64_t req_id, void *context, int rec_num);
 
 DAT_RETURN
 dapls_modify_qp_state ( IN ib_qp_handle_t      qp_handle,
Index: dapl/openib/README
===================================================================
--- dapl/openib/README  (revision 2919)
+++ dapl/openib/README  (working copy)
@@ -39,18 +39,16 @@
 
        server: dtest -s 
        client: dtest -h hostname
+
+Testing: dtest, dapltest - cl.sh regress.sh
        
-setup/known issues:
-       
-       First drop with uCM (without IBAT), tested with simple dtest across 2 
nodes. 
-       hand rolled path records require remote LID and GID set via enviroment:
+Setup:
        
-       export DAPL_REMOTE_GID  "fe80:0000:0000:0000:0002:c902:0000:4071"
-       export DAPL_REMOTE_LID  "0002"
+       Third drop of code, includes uCM and uAT support.
+       NOTE: requires both uCM and uAT libraries and device modules from trunk.
 
-    Also, hard coded (RTR) for use with port 1 only.
-          
+Known issues:
        no memory windows support in ibverbs, dat_create_rmr fails.
+       some uCM scale up issues with an 8 thread dapltest in regress.sh
+       hard coded modify QP RTR to port 1, waiting for ib_cm_init_qp_attr call.
        
-
-
Index: dapl/openib/dapl_ib_cq.c
===================================================================
--- dapl/openib/dapl_ib_cq.c    (revision 2919)
+++ dapl/openib/dapl_ib_cq.c    (working copy)
@@ -50,9 +50,96 @@
 #include "dapl_adapter_util.h"
 #include "dapl_lmr_util.h"
 #include "dapl_evd_util.h"
+#include "dapl_ring_buffer_util.h"
 #include <sys/poll.h>
+#include <signal.h>
 
+int dapli_cq_thread_init(struct dapl_hca *hca_ptr)
+{
+        DAT_RETURN dat_status;
+
+        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_init(%p)\n", hca_ptr);
+
+        /* create thread to process inbound connect request */
+        dat_status = dapl_os_thread_create( cq_thread, (void*)hca_ptr,
&hca_ptr->ib_trans.cq_thread);
+        if (dat_status != DAT_SUCCESS)
+        {
+                dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+                             " cq_thread_init: failed to create thread\n");
+                return 1;
+        }
+        return 0;
+}
+
+void dapli_cq_thread_destroy(struct dapl_hca *hca_ptr)
+{
+        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%p)\n", hca_ptr);
+
+        /* destroy cr_thread and lock */
+        hca_ptr->ib_trans.cq_destroy = 1;
+        pthread_kill(hca_ptr->ib_trans.cq_thread, SIGUSR1);
+        dapl_dbg_log(DAPL_DBG_TYPE_CM," cq_thread_destroy(%p) SIGUSR1 
sent\n",hca_ptr);
+        while (hca_ptr->ib_trans.cq_destroy != 2) {
+                struct timespec sleep, remain;
+                sleep.tv_sec = 0;
+                sleep.tv_nsec = 10000000; /* 10 ms */
+                dapl_dbg_log(DAPL_DBG_TYPE_UTIL,
+                             " cq_thread_destroy: waiting for cq_thread\n");
+                nanosleep (&sleep, &remain);
+        }
+        dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread_destroy(%d) 
exit\n",getpid());
+       return;
+}
+
+/* something to catch the signal */
+static void ib_cq_handler(int signum)
+{
+        return;
+}
+
+void cq_thread( void *arg )
+{
+       struct dapl_hca *hca_ptr = arg;
+       struct dapl_evd *evd_ptr;
+       struct ibv_cq   *ibv_cq = NULL;
+       sigset_t        sigset;
+       int             status = 0; 
+
+       dapl_dbg_log ( DAPL_DBG_TYPE_UTIL," cq_thread: ENTER hca %p\n",hca_ptr);
+  
+        sigemptyset(&sigset);
+        sigaddset(&sigset,SIGUSR1);
+        pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
+        signal(SIGUSR1, ib_cq_handler);
+
+       /* wait on DTO event, or signal to abort */
+       while (!hca_ptr->ib_trans.cq_destroy) {
+
+               struct pollfd cq_poll = {
+                       .fd      = hca_ptr->ib_hca_handle->cq_fd[0],
+                       .events  = POLLIN,
+                       .revents = 0
+               };
 
+               status = poll(&cq_poll, 1, -1);
+               if ((status == 1) &&
+                       (!ibv_get_cq_event(hca_ptr->ib_hca_handle, 0, &ibv_cq, 
(void*)&evd_ptr))) {
+       
+                       if (DAPL_BAD_HANDLE(evd_ptr, DAPL_MAGIC_EVD))
+                               continue;
+
+                       /* process DTO event via callback */
+                       dapl_evd_dto_callback ( 
evd_ptr->header.owner_ia->hca_ptr->ib_hca_handle,
+                                               evd_ptr->ib_cq_handle,
+                                               (void*)evd_ptr );
+               } else {
+
+               }
+       } 
+       hca_ptr->ib_trans.cq_destroy = 2;
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL," cq_thread: EXIT: hca %p \n", hca_ptr);
+       return;
+}
 /*
  * Map all verbs DTO completion codes to the DAT equivelent.
  *
@@ -410,9 +497,9 @@
                IN DAPL_EVD             *evd_ptr,
                IN ib_wait_obj_handle_t *p_cq_wait_obj_handle )
 {
-       dapl_dbg_log (  DAPL_DBG_TYPE_UTIL, 
+       dapl_dbg_log (  DAPL_DBG_TYPE_CM, 
                        " cq_object_create: (%p)=%p\n", 
-                       p_cq_wait_obj_handle, *p_cq_wait_obj_handle);
+                       p_cq_wait_obj_handle, evd_ptr );
 
        /* set cq_wait object to evd_ptr */
        *p_cq_wait_obj_handle = evd_ptr;
@@ -447,33 +534,86 @@
 {
        DAPL_EVD                *evd_ptr = p_cq_wait_obj_handle;
        ib_cq_handle_t          cq = evd_ptr->ib_cq_handle;
-       struct ibv_cq           *ibv_cq;
-       void                    *ibv_ctx;
-       int                     status = -ETIMEDOUT; 
+       struct ibv_cq           *ibv_cq = NULL;
+       void                    *ibv_ctx = NULL;
+       int                     status = 0; 
 
-       dapl_dbg_log ( DAPL_DBG_TYPE_UTIL, 
+       dapl_dbg_log ( DAPL_DBG_TYPE_CM, 
                        " cq_object_wait: dev %p evd %p cq %p, time %d\n", 
                        cq->context, evd_ptr, cq, timeout );
 
-       /* Multiple EVD's sharing one event handle for now */
-       if (cq) {
-               struct pollfd cq_poll = { 
-                       .fd      = cq->context->cq_fd[0],
-                       .events  = POLLIN
+       /* Multiple EVD's sharing one event handle for now until uverbs 
supports more */
+
+       /*
+        *  This makes it very inefficient and tricky to manage multiple CQ per 
device open
+        *  For example: 4 threads waiting on separate CQ events will all be 
woke when
+        *  a CQ event fires. So the poll wakes up and the first thread to get 
to the
+        *  the get_cq_event wins and the other 3 will block. The dapl_evd_wait 
code
+        *  above will immediately do a poll_cq after returning from CQ wait 
and if
+        *  nothing on the queue will call this wait again and go back to 
sleep. So
+        *  as long as they all wake up, a mutex is held around the get_cq_event
+        *  so no blocking occurs and they all return then everything should 
work.
+        *  Of course, the timeout needs adjusted on the threads that go back 
to sleep.
+        */
+       while (cq) {
+               struct pollfd cq_poll = {
+                       .fd      = cq->context->cq_fd[0],
+                       .events  = POLLIN,
+                       .revents = 0
                };
-               int     timeout_ms = -1;
+               int     timeout_ms = -1;
 
                if (timeout != DAT_TIMEOUT_INFINITE)
                        timeout_ms = timeout/1000;
-               
+
+               /* check if another thread processed the event already, pending 
queue > 0 */
+               dapl_os_lock( 
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+               if (dapls_rbuf_count(&evd_ptr->pending_event_queue)) {
+                       dapl_os_unlock( 
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+                       break;  
+               }
+               dapl_os_unlock( 
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+
+               dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: polling\n");
                status = poll(&cq_poll, 1, timeout_ms);
-               if (status == 1)
-                       status = ibv_get_cq_event(cq->context, 
-                                                 0, &ibv_cq, &ibv_ctx);
-       }
-       dapl_dbg_log (DAPL_DBG_TYPE_UTIL, 
-                     " cq_object_wait: RET cq %p ibv_cq %p ibv_ctx %p %x\n",
-                     cq,ibv_cq,ibv_ctx,status);
+               dapl_dbg_log ( DAPL_DBG_TYPE_CM," cq_object_wait: poll returned
status=%d\n",status);
+
+               /*
+                * If poll with timeout wakes then hold mutex around a poll 
with no timeout
+                * so subsequent get_cq_events will be guaranteed not to block
+                * If the event does not belong to this EVD then put it on 
proper EVD pending 
+                * queue under the mutex.
+                */
+               if (status == 1) {
+                       dapl_os_lock( 
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+                       status = poll(&cq_poll, 1, 0);
+                       if (status == 1) {
+                               status = ibv_get_cq_event(cq->context,
+                                                         0, &ibv_cq, &ibv_ctx);
+
+                               /* if event is not ours, put on proper evd 
pending queue */
+                               /* force another wakeup */
+                               if ((ibv_ctx != evd_ptr ) && 
+                                   (!DAPL_BAD_HANDLE(ibv_ctx, 
DAPL_MAGIC_EVD))) {
+                                       dapl_dbg_log (DAPL_DBG_TYPE_CM,
+                                                     " cq_object_wait: ibv_ctx 
%p != evd %p\n",
+                                                     ibv_ctx, evd_ptr);
+                                       dapls_evd_copy_cq((struct 
evd_ptr*)ibv_ctx); 
+                                       dapl_os_unlock(
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+                                       continue;
+                               }       
+                       }       
+                       dapl_os_unlock( 
&evd_ptr->header.owner_ia->hca_ptr->ib_trans.cq_lock );
+                       break;
+
+               } else if (status == 0) {
+                       status = ETIMEDOUT;  
+                       break;
+               }
+       }       
+       dapl_dbg_log (DAPL_DBG_TYPE_CM, 
+                     " cq_object_wait: RET evd %p cq %p ibv_cq %p ibv_ctx %p 
%s\n",
+                     evd_ptr, cq,ibv_cq,ibv_ctx,strerror(errno));
        
        return(dapl_convert_errno(status,"cq_wait_object_wait"));
        





_______________________________________________
openib-general mailing list
[email protected]
http://openib.org/mailman/listinfo/openib-general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to