Author: pjd
Date: Thu Oct 27 20:32:57 2011
New Revision: 226859
URL: http://svn.freebsd.org/changeset/base/226859

Log:
  Implement 'async' mode for HAST.
  
  MFC after:    3 days

Modified:
  head/sbin/hastd/hast.conf.5
  head/sbin/hastd/parse.y
  head/sbin/hastd/primary.c

Modified: head/sbin/hastd/hast.conf.5
==============================================================================
--- head/sbin/hastd/hast.conf.5 Thu Oct 27 20:23:03 2011        (r226858)
+++ head/sbin/hastd/hast.conf.5 Thu Oct 27 20:32:57 2011        (r226859)
@@ -28,7 +28,7 @@
 .\"
 .\" $FreeBSD$
 .\"
-.Dd May 20, 2011
+.Dd October 27, 2011
 .Dt HAST.CONF 5
 .Os
 .Sh NAME
@@ -224,9 +224,6 @@ completes.
 This is the fastest and the most dangerous replication mode.
 This mode should be used when replicating to a distant node where
 latency is too high for other modes.
-The
-.Ic async
-replication mode is currently not implemented.
 .El
 .It Ic checksum Aq algorithm
 .Pp

Modified: head/sbin/hastd/parse.y
==============================================================================
--- head/sbin/hastd/parse.y     Thu Oct 27 20:23:03 2011        (r226858)
+++ head/sbin/hastd/parse.y     Thu Oct 27 20:32:57 2011        (r226859)
@@ -301,11 +301,9 @@ yy_config_parse(const char *config, bool
                         */
                        curres->hr_replication = depth0_replication;
                }
-               if (curres->hr_replication == HAST_REPLICATION_MEMSYNC ||
-                   curres->hr_replication == HAST_REPLICATION_ASYNC) {
+               if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
                        pjdlog_warning("Replication mode \"%s\" is not 
implemented, falling back to \"%s\".",
-                           curres->hr_replication == HAST_REPLICATION_MEMSYNC ?
-                           "memsync" : "async", "fullsync");
+                           "memsync", "fullsync");
                        curres->hr_replication = HAST_REPLICATION_FULLSYNC;
                }
                if (curres->hr_checksum == -1) {

Modified: head/sbin/hastd/primary.c
==============================================================================
--- head/sbin/hastd/primary.c   Thu Oct 27 20:23:03 2011        (r226858)
+++ head/sbin/hastd/primary.c   Thu Oct 27 20:32:57 2011        (r226859)
@@ -89,6 +89,15 @@ struct hio {
         * Structure used to communicate with GEOM Gate class.
         */
        struct g_gate_ctl_io     hio_ggio;
+       /*
+        * Request was already confirmed to GEOM Gate.
+        */
+       bool                     hio_done;
+       /*
+        * Remember replication from the time the request was initiated,
+        * so we won't get confused when replication changes on reload.
+        */
+       int                      hio_replication;
        TAILQ_ENTRY(hio)        *hio_next;
 };
 #define        hio_free_next   hio_next[0]
@@ -1056,6 +1065,42 @@ remote_close(struct hast_resource *res, 
 }
 
 /*
+ * Acknowledge write completion to the kernel, but don't update activemap yet.
+ */
+static void
+write_complete(struct hast_resource *res, struct hio *hio)
+{
+       struct g_gate_ctl_io *ggio;
+       unsigned int ncomp;
+
+       PJDLOG_ASSERT(!hio->hio_done);
+
+       ggio = &hio->hio_ggio;
+       PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE);
+
+       /*
+        * Bump local count if this is first write after
+        * connection failure with remote node.
+        */
+       ncomp = 1;
+       rw_rlock(&hio_remote_lock[ncomp]);
+       if (!ISCONNECTED(res, ncomp)) {
+               mtx_lock(&metadata_lock);
+               if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) {
+                       res->hr_primary_localcnt++;
+                       pjdlog_debug(1, "Increasing localcnt to %ju.",
+                           (uintmax_t)res->hr_primary_localcnt);
+                       (void)metadata_write(res);
+               }
+               mtx_unlock(&metadata_lock);
+       }
+       rw_unlock(&hio_remote_lock[ncomp]);
+       if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
+               primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
+       hio->hio_done = true;
+}
+
+/*
  * Thread receives ggate I/O requests from the kernel and passes them to
  * appropriate threads:
  * WRITE - always goes to both local_send and remote_send threads
@@ -1075,8 +1120,6 @@ ggate_recv_thread(void *arg)
        unsigned int ii, ncomp, ncomps;
        int error;
 
-       ncomps = HAST_NCOMPONENTS;
-
        for (;;) {
                pjdlog_debug(2, "ggate_recv: Taking free request.");
                QUEUE_TAKE2(hio, free);
@@ -1085,6 +1128,8 @@ ggate_recv_thread(void *arg)
                ggio->gctl_unit = res->hr_ggateunit;
                ggio->gctl_length = MAXPHYS;
                ggio->gctl_error = 0;
+               hio->hio_done = false;
+               hio->hio_replication = res->hr_replication;
                pjdlog_debug(2,
                    "ggate_recv: (%p) Waiting for request from the kernel.",
                    hio);
@@ -1117,11 +1162,16 @@ ggate_recv_thread(void *arg)
                        primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.",
                            strerror(error));
                }
+
+               ncomp = 0;
+               ncomps = HAST_NCOMPONENTS;
+
                for (ii = 0; ii < ncomps; ii++)
                        hio->hio_errors[ii] = EINVAL;
                reqlog(LOG_DEBUG, 2, ggio,
                    "ggate_recv: (%p) Request received from the kernel: ",
                    hio);
+
                /*
                 * Inform all components about new write request.
                 * For read request prefer local component unless the given
@@ -1130,10 +1180,7 @@ ggate_recv_thread(void *arg)
                switch (ggio->gctl_cmd) {
                case BIO_READ:
                        res->hr_stat_read++;
-                       pjdlog_debug(2,
-                           "ggate_recv: (%p) Moving request to the send 
queue.",
-                           hio);
-                       refcount_init(&hio->hio_countdown, 1);
+                       ncomps = 1;
                        mtx_lock(&metadata_lock);
                        if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF ||
                            res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) {
@@ -1155,7 +1202,6 @@ ggate_recv_thread(void *arg)
                                ncomp = 1;
                        }
                        mtx_unlock(&metadata_lock);
-                       QUEUE_INSERT1(hio, send, ncomp);
                        break;
                case BIO_WRITE:
                        res->hr_stat_write++;
@@ -1198,25 +1244,19 @@ ggate_recv_thread(void *arg)
                                (void)hast_activemap_flush(res);
                        }
                        mtx_unlock(&res->hr_amp_lock);
-                       /* FALLTHROUGH */
+                       break;
                case BIO_DELETE:
+                       res->hr_stat_delete++;
+                       break;
                case BIO_FLUSH:
-                       switch (ggio->gctl_cmd) {
-                       case BIO_DELETE:
-                               res->hr_stat_delete++;
-                               break;
-                       case BIO_FLUSH:
-                               res->hr_stat_flush++;
-                               break;
-                       }
-                       pjdlog_debug(2,
-                           "ggate_recv: (%p) Moving request to the send 
queue.",
-                           hio);
-                       refcount_init(&hio->hio_countdown, ncomps);
-                       for (ii = 0; ii < ncomps; ii++)
-                               QUEUE_INSERT1(hio, send, ii);
+                       res->hr_stat_flush++;
                        break;
                }
+               pjdlog_debug(2,
+                   "ggate_recv: (%p) Moving request to the send queues.", hio);
+               refcount_init(&hio->hio_countdown, ncomps);
+               for (ii = ncomp; ii < ncomps; ii++)
+                       QUEUE_INSERT1(hio, send, ii);
        }
        /* NOTREACHED */
        return (NULL);
@@ -1285,6 +1325,11 @@ local_send_thread(void *arg)
                                    ret, (intmax_t)ggio->gctl_length);
                        } else {
                                hio->hio_errors[ncomp] = 0;
+                               if (hio->hio_replication ==
+                                   HAST_REPLICATION_ASYNC) {
+                                       ggio->gctl_error = 0;
+                                       write_complete(res, hio);
+                               }
                        }
                        break;
                case BIO_DELETE:
@@ -1668,7 +1713,7 @@ ggate_send_thread(void *arg)
        struct hast_resource *res = arg;
        struct g_gate_ctl_io *ggio;
        struct hio *hio;
-       unsigned int ii, ncomp, ncomps;
+       unsigned int ii, ncomps;
 
        ncomps = HAST_NCOMPONENTS;
 
@@ -1718,28 +1763,14 @@ ggate_send_thread(void *arg)
                        if (range_sync_wait)
                                cv_signal(&range_sync_cond);
                        mtx_unlock(&range_lock);
-                       /*
-                        * Bump local count if this is first write after
-                        * connection failure with remote node.
-                        */
-                       ncomp = 1;
-                       rw_rlock(&hio_remote_lock[ncomp]);
-                       if (!ISCONNECTED(res, ncomp)) {
-                               mtx_lock(&metadata_lock);
-                               if (res->hr_primary_localcnt ==
-                                   res->hr_secondary_remotecnt) {
-                                       res->hr_primary_localcnt++;
-                                       pjdlog_debug(1,
-                                           "Increasing localcnt to %ju.",
-                                           
(uintmax_t)res->hr_primary_localcnt);
-                                       (void)metadata_write(res);
-                               }
-                               mtx_unlock(&metadata_lock);
+                       if (!hio->hio_done)
+                               write_complete(res, hio);
+               } else {
+                       if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) {
+                               primary_exit(EX_OSERR,
+                                   "G_GATE_CMD_DONE failed");
                        }
-                       rw_unlock(&hio_remote_lock[ncomp]);
                }
-               if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0)
-                       primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed");
                pjdlog_debug(2,
                    "ggate_send: (%p) Moving request to the free queue.", hio);
                QUEUE_INSERT2(hio, free);
@@ -1892,6 +1923,8 @@ sync_thread(void *arg __unused)
                ggio->gctl_offset = offset;
                ggio->gctl_length = length;
                ggio->gctl_error = 0;
+               hio->hio_done = false;
+               hio->hio_replication = res->hr_replication;
                for (ii = 0; ii < ncomps; ii++)
                        hio->hio_errors[ii] = EINVAL;
                reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ",
@@ -2080,8 +2113,7 @@ primary_config_reload(struct hast_resour
         * Don't bother if we need to reconnect.
         */
        if ((modified & MODIFIED_TIMEOUT) != 0 &&
-           (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
-           MODIFIED_REPLICATION)) == 0) {
+           (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) {
                for (ii = 0; ii < ncomps; ii++) {
                        if (!ISREMOTE(ii))
                                continue;
@@ -2103,8 +2135,7 @@ primary_config_reload(struct hast_resour
                        }
                }
        }
-       if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR |
-           MODIFIED_REPLICATION)) != 0) {
+       if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) {
                for (ii = 0; ii < ncomps; ii++) {
                        if (!ISREMOTE(ii))
                                continue;
_______________________________________________
svn-src-head@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/svn-src-head
To unsubscribe, send any mail to "svn-src-head-unsubscr...@freebsd.org"

Reply via email to