Author: adrian.chadd
Date: Sun Jul 19 06:41:02 2009
New Revision: 14212

Modified:
    playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h
    playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c
    playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h

Log:
Add the new COSS rebuild code which uses the helper.



Modified: playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h       (original)
+++ playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h       Sun Jul 19  
06:41:02 2009
@@ -156,10 +156,6 @@
      int curstripe;
      struct {
        char rebuilding;
-       char reading;
-       int curstripe;
-       char *buf;
-       int buflen;
      } rebuild;
      int max_disk_nf;


Modified: playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c       
(original)
+++ playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c       Sun Jul 
 
19 06:41:02 2009
@@ -37,11 +37,11 @@

  #include "../../libasyncio/aiops.h"
  #include "../../libasyncio/async_io.h"
+#include "../../libsqstore/store_log.h"
+
  #include "store_coss.h"
  #include "store_rebuild_coss.h"

-static void storeDirCoss_ReadStripe(RebuildState * rb);
-
  static void
  storeCossRebuildComplete(void *data)
  {
@@ -53,44 +53,45 @@
      storeRebuildComplete(&rb->counts);
      debug(47, 1) ("COSS: %s: Rebuild Completed\n", stripePath(SD));
      cs->rebuild.rebuilding = 0;
+    if (rb->helper.pid != -1)
+        ipcClose(rb->helper.pid, rb->helper.r_fd, rb->helper.w_fd);
+    safe_free(rb->rbuf.buf);
      debug(47, 1) ("  %d objects scanned, %d objects relocated, %d objects  
fresher, %d objects ignored\n",
          rb->counts.scancount, rb->cosscounts.reloc,  
rb->cosscounts.fresher, rb->cosscounts.unknown);
      cbdataFree(rb);
  }

  static void
-storeCoss_AddStoreEntry(RebuildState * rb, const cache_key * key,  
StoreEntry * e)
+storeCoss_AddStoreEntry(RebuildState * rb, const cache_key * key,  
storeSwapLogData *d)
  {
      StoreEntry *ne;
      SwapDir *SD = rb->sd;
      CossInfo *cs = SD->fsdata;
      rb->counts.objcount++;
-    /* The Passed-in store entry is temporary; don't bloody use it  
directly! */
-    assert(e->swap_dirn == SD->index);
+
      ne = new_StoreEntry(STORE_ENTRY_WITHOUT_MEMOBJ, NULL);
      ne->store_status = STORE_OK;
      storeSetMemStatus(ne, NOT_IN_MEMORY);
      ne->swap_status = SWAPOUT_DONE;
-    ne->swap_filen = e->swap_filen;
+    ne->swap_filen = d->swap_filen;
      ne->swap_dirn = SD->index;
-    ne->swap_file_sz = e->swap_file_sz;
+    ne->swap_file_sz = d->swap_file_sz;
      ne->lock_count = 0;
-    ne->lastref = e->lastref;
-    ne->timestamp = e->timestamp;
-    ne->expires = e->expires;
-    ne->lastmod = e->lastmod;
-    ne->refcount = e->refcount;
-    ne->flags = e->flags;
+    ne->lastref = d->lastref;
+    ne->timestamp = d->timestamp;
+    ne->expires = d->expires;
+    ne->lastmod = d->lastmod;
+    ne->refcount = d->refcount;
+    ne->flags = d->flags;
      EBIT_SET(ne->flags, ENTRY_CACHABLE);
      EBIT_CLR(ne->flags, RELEASE_REQUEST);
      EBIT_CLR(ne->flags, KEY_PRIVATE);
      ne->ping_status = PING_NONE;
      EBIT_CLR(ne->flags, ENTRY_VALIDATED);
      storeHashInsert(ne, key); /* do it after we clear KEY_PRIVATE */
-    storeCossAdd(SD, ne, cs->rebuild.curstripe);
+    storeCossAdd(SD, ne,  storeCossFilenoToStripe(cs, d->swap_filen));
      storeEntryDump(ne, 5);
      assert(ne->repl.data != NULL);
-    assert(e->repl.data == NULL);
  }

  static void
@@ -114,7 +115,7 @@
   * stripe.
   */
  static void
-storeCoss_ConsiderStoreEntry(RebuildState * rb, const cache_key * key,  
StoreEntry * e)
+storeCoss_ConsiderStoreEntry(RebuildState * rb, const cache_key * key,  
storeSwapLogData * d)
  {
      StoreEntry *oe;

@@ -122,9 +123,9 @@
      oe = storeGet(key);
      if (oe == NULL) {
        rb->cosscounts.new++;
-       debug(47, 3) ("COSS: Adding filen %d\n", e->swap_filen);
+       debug(47, 3) ("COSS: Adding filen %d\n", d->swap_filen);
        /* no clash! woo, can add and forget */
-       storeCoss_AddStoreEntry(rb, key, e);
+       storeCoss_AddStoreEntry(rb, key, d);
        return;
      }
      /* This isn't valid - its possible we have a fresher object in another  
store */
@@ -134,267 +135,135 @@
      /* Dang, its a clash. See if its fresher */

      /* Fresher? Its a new object: deallocate the old one, reallocate the  
new one */
-    if (e->lastref > oe->lastref) {
-       debug(47, 3) ("COSS: fresher object for filen %d found (%ld -> %ld)\n", 
 
oe->swap_filen, (long int) oe->timestamp, (long int) e->timestamp);
+    if (d->lastref > oe->lastref) {
+       debug(47, 3) ("COSS: fresher object for filen %d found (%ld -> %ld)\n", 
 
oe->swap_filen, (long int) oe->timestamp, (long int) d->timestamp);
        rb->cosscounts.fresher++;
        storeCoss_DeleteStoreEntry(rb, key, oe);
        oe = NULL;
-       storeCoss_AddStoreEntry(rb, key, e);
+       storeCoss_AddStoreEntry(rb, key, d);
        return;
      }
      /*
       * Not fresher? Its the same object then we /should/ probably relocate  
it; I'm
       * not sure what should be done here.
       */
-    if (oe->timestamp == e->timestamp && oe->expires == e->expires) {
-       debug(47, 3) ("COSS: filen %d -> %d (since they're the same!)\n",  
oe->swap_filen, e->swap_filen);
+    if (oe->timestamp == d->timestamp && oe->expires == d->expires) {
+       debug(47, 3) ("COSS: filen %d -> %d (since they're the same!)\n",  
oe->swap_filen, d->swap_filen);
        rb->cosscounts.reloc++;
        storeCoss_DeleteStoreEntry(rb, key, oe);
        oe = NULL;
-       storeCoss_AddStoreEntry(rb, key, e);
+       storeCoss_AddStoreEntry(rb, key, d);
        return;
      }
-    debug(47, 3) ("COSS: filen %d: ignoring this one for some reason\n",  
e->swap_filen);
+    debug(47, 3) ("COSS: filen %d: ignoring this one for some reason\n",  
d->swap_filen);
      rb->cosscounts.unknown++;
  }

-
-
  /*
- * Take a stripe and attempt to place objects into it
+ * This is a cut-and-paste from the AUFS helper read function.
+ * Tsk!
   */
  static void
-storeDirCoss_ParseStripeBuffer(RebuildState * rb)
+storeCossRebuildHelperRead(int fd, void *data)
  {
-    SwapDir *SD = rb->sd;
-    CossInfo *cs = SD->fsdata;
-    tlv *t, *tlv_list;
-    int j = 0;
-    int bl = 0;
-    int tmp;
-    squid_off_t *l, len = 0;
-    int blocksize = cs->blksz_mask + 1;
-    StoreEntry tmpe;
-    cache_key key[SQUID_MD5_DIGEST_LENGTH];
-    sfileno filen;
-
-    assert(cs->rebuild.rebuilding == 1);
-    assert(cs->numstripes > 0);
-    assert(cs->rebuild.buf != NULL);
+        RebuildState *rb = data;
+        SwapDir *sd = rb->sd;
+        /* squidaioinfo_t *aioinfo = (squidaioinfo_t *) sd->fsdata; */
+        int r, i;
+        storeSwapLogData s;
+
+        assert(fd == rb->helper.r_fd);
+        debug(47, 5) ("storeCossRebuildHelperRead: %s: ready for helper  
read\n", sd->path);
+
+        assert(rb->rbuf.size - rb->rbuf.used > 0);
+        debug(47, 8) ("storeCossRebuildHelperRead: %s: trying to read %d  
bytes\n", sd->path, rb->rbuf.size - rb->rbuf.used);
+        r = FD_READ_METHOD(fd, rb->rbuf.buf + rb->rbuf.used, rb->rbuf.size  
- rb->rbuf.used);
+        debug(47, 8) ("storeCossRebuildHelperRead: %s: read %d bytes\n",  
sd->path, r);
+        if (r <= 0) {
+                /* Error or EOF */
+                debug(47, 1) ("storeCossRebuildHelperRead: %s: read  
returned %d; error/eof?\n", sd->path, r);
+                storeCossRebuildComplete(rb);
+                return;
+        }
+        rb->rbuf.used += r;
+
+        /* We have some data; process what we can */
+        i = 0;
+        while (i + sizeof(storeSwapLogData) <= rb->rbuf.used) {
+                memcpy(&s, rb->rbuf.buf + i, sizeof(storeSwapLogData));
+                switch (s.op) {
+                        case SWAP_LOG_VERSION:
+                                break;
+                        case SWAP_LOG_PROGRESS:
+                                storeRebuildProgress(rb->sd->index,
+                                    ((storeSwapLogProgress *)(&s))->total,  
((storeSwapLogProgress *)(&s))->progress);
+                                break;
+                        case SWAP_LOG_COMPLETED:
+                                debug(47, 1) ("  %s: completed rebuild\n",  
sd->path);
+                                storeCossRebuildComplete(rb);
+                                return;
+                        default:
+                                rb->n_read++;
+                                /*  
storeAufsDirRebuildFromSwapLogObject(rb, s); */
+                               storeCoss_ConsiderStoreEntry(rb, s.key, &s);
+                                rb->counts.scancount++;
+                }
+                i += sizeof(storeSwapLogData);
+        }
+        debug(47, 5) ("storeCossRebuildHelperRead: %s: read %d entries\n",  
sd->path, i / sizeof(storeSwapLogData));
+
+        /* Shuffle what is left to the beginning of the buffer */
+        if (i < rb->rbuf.used) {
+                memmove(rb->rbuf.buf, rb->rbuf.buf + i, rb->rbuf.used - i);
+                rb->rbuf.used -= i;
+        }

-    if (cs->rebuild.buflen == 0) {
-       debug(47, 3) ("COSS: %s: stripe %d: read 0 bytes, skipping stripe\n",  
stripePath(SD), cs->rebuild.curstripe);
-       return;
-    }
-    while (j < cs->rebuild.buflen) {
-       l = NULL;
-       bl = 0;
-       /* XXX there's no bounds checking on the buffer being passed into  
storeSwapMetaUnpack! */
-       tlv_list = storeSwapMetaUnpack(cs->rebuild.buf + j, &bl);
-       if (tlv_list == NULL) {
-           debug(47, 3) ("COSS: %s: stripe %d: offset %d gives NULL swapmeta  
data; end of stripe\n", stripePath(SD), cs->rebuild.curstripe, j);
-           return;
-       }
-       filen = (off_t) j / (off_t) blocksize + (off_t) ((off_t)  
cs->rebuild.curstripe * (off_t) COSS_MEMBUF_SZ / (off_t) blocksize);
-       debug(47, 3) ("COSS: %s: stripe %d: filen %d: header size %d\n",  
stripePath(SD), cs->rebuild.curstripe, filen, bl);
-
-       /* COSS objects will have an object size written into the metadata */
-       memset(&tmpe, 0, sizeof(tmpe));
-       memset(key, 0, sizeof(key));
-       for (t = tlv_list; t; t = t->next) {
-           switch (t->type) {
-           case STORE_META_URL:
-               debug(47, 3) ("    URL: %s\n", (char *) t->value);
-               break;
-           case STORE_META_OBJSIZE:
-               l = t->value;
-               debug(47, 3) ("Size: %" PRINTF_OFF_T " (len %d)\n", *l, 
t->length);
-               break;
-           case STORE_META_KEY:
-               if (t->length != SQUID_MD5_DIGEST_LENGTH) {
-                   debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid  
STORE_META_KEY length. Ignoring object.\n", stripePath(SD),  
cs->rebuild.curstripe, j);
-                   goto nextobject;
-               }
-               xmemcpy(key, t->value, SQUID_MD5_DIGEST_LENGTH);
-               break;
-#if SIZEOF_SQUID_FILE_SZ == SIZEOF_SIZE_T
-           case STORE_META_STD:
-               if (t->length != STORE_HDR_METASIZE) {
-                   debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid  
STORE_META_STD length. Ignoring object.\n", stripePath(SD),  
cs->rebuild.curstripe, j);
-                   goto nextobject;
-               }
-               xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE);
-               break;
-#else
-           case STORE_META_STD_LFS:
-               if (t->length != STORE_HDR_METASIZE) {
-                   debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid  
STORE_META_STD_LFS length. Ignoring object.\n", stripePath(SD),  
cs->rebuild.curstripe, j);
-                   goto nextobject;
-               }
-               xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE);
-               break;
-           case STORE_META_STD:
-               if (t->length != STORE_HDR_METASIZE_OLD) {
-                   debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid  
STORE_META_STD length. Ignoring object.\n", stripePath(SD),  
cs->rebuild.curstripe, j);
-                   goto nextobject;
-               } {
-                   struct {
-                       time_t timestamp;
-                       time_t lastref;
-                       time_t expires;
-                       time_t lastmod;
-                       size_t swap_file_sz;
-                       u_short refcount;
-                       u_short flags;
-                   }     *tmp = t->value;
-                   assert(sizeof(*tmp) == STORE_HDR_METASIZE_OLD);
-                   tmpe.timestamp = tmp->timestamp;
-                   tmpe.lastref = tmp->lastref;
-                   tmpe.expires = tmp->expires;
-                   tmpe.lastmod = tmp->lastmod;
-                   tmpe.swap_file_sz = tmp->swap_file_sz;
-                   tmpe.refcount = tmp->refcount;
-                   tmpe.flags = tmp->flags;
-               }
-               break;
-#endif
-           }
-       }
-       /* Make sure we have an object; if we don't then it may be an 
indication  
of trouble */
-       if (l == NULL) {
-           debug(47, 3) ("COSS: %s: stripe %d: Object with no size; end of  
stripe\n", stripePath(SD), cs->rebuild.curstripe);
-           storeSwapTLVFree(tlv_list);
-           return;
-       }
-       len = *l;
-       /* Finally, make sure there's enough data left in this stripe to 
satisfy  
the object
-        * we've just been informed about
-        */
-       if ((cs->rebuild.buflen - j) < (len + bl)) {
-           debug(47, 3) ("COSS: %s: stripe %d: Not enough data in this stripe  
for this object, bye bye.\n", stripePath(SD), cs->rebuild.curstripe);
-           storeSwapTLVFree(tlv_list);
-           return;
-       }
-       /* Houston, we have an object */
-       if (storeKeyNull(key)) {
-           debug(47, 3) ("COSS: %s: stripe %d: null data, next!\n",  
stripePath(SD), cs->rebuild.curstripe);
-           goto nextobject;
-       }
-       rb->counts.scancount++;
-       tmpe.hash.key = key;
-       /* Check sizes */
-       if (tmpe.swap_file_sz == 0) {
-           tmpe.swap_file_sz = len + bl;
-       }
-       if (tmpe.swap_file_sz != (len + bl)) {
-           debug(47, 3) ("COSS: %s: stripe %d: file size mismatch (%"  
PRINTF_OFF_T " != %" PRINTF_OFF_T ")\n", stripePath(SD),  
cs->rebuild.curstripe, tmpe.swap_file_sz, len);
-           goto nextobject;
-       }
-       if (EBIT_TEST(tmpe.flags, KEY_PRIVATE)) {
-           debug(47, 3) ("COSS: %s: stripe %d: private key flag set,  
ignoring.\n", stripePath(SD), cs->rebuild.curstripe);
-           rb->counts.badflags++;
-           goto nextobject;
-       }
-       /* Time to consider the object! */
-       tmpe.swap_filen = filen;
-       tmpe.swap_dirn = SD->index;
-
-       debug(47, 3) ("COSS: %s Considering filneumber %d\n", stripePath(SD),  
tmpe.swap_filen);
-       storeCoss_ConsiderStoreEntry(rb, key, &tmpe);
-
-      nextobject:
-       /* Free the TLV data */
-       storeSwapTLVFree(tlv_list);
-       tlv_list = NULL;
-
-       /* Now, advance to the next block-aligned offset after this object */
-       j = j + len + bl;
-       /* And now, the blocksize! */
-       tmp = j / blocksize;
-       tmp = (tmp + 1) * blocksize;
-       j = tmp;
-    }
+        /* Re-register */
+        commSetSelect(rb->helper.r_fd, COMM_SELECT_READ,  
storeCossRebuildHelperRead, rb, 0);
  }

  static void
-storeDirCoss_ReadStripeComplete(int fd, void *my_data, const char *buf,  
int aio_return, int aio_errno)
+storeDirCoss_StartDiskRebuild(RebuildState * rb)
  {
-    RebuildState *rb = my_data;
      SwapDir *SD = rb->sd;
      CossInfo *cs = SD->fsdata;
-    int r_errflag;
-    int r_len;
-    r_len = aio_return;
-    if (aio_errno)
-       r_errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR;
-    else
-       r_errflag = DISK_OK;
-    xmemcpy(cs->rebuild.buf, buf, r_len);
-
-    debug(47, 2) ("COSS: %s: stripe %d, read %d bytes, status %d\n",  
stripePath(SD), cs->rebuild.curstripe, r_len, r_errflag);
-    cs->rebuild.reading = 0;
-    if (r_errflag != DISK_OK) {
-       debug(47, 2) ("COSS: %s: stripe %d: error! Ignoring objects in this  
stripe.\n", stripePath(SD), cs->rebuild.curstripe);
-       goto nextstripe;
-    }
-    cs->rebuild.buflen = r_len;
-    /* parse the stripe contents */
-    /*
-     * XXX note: the read should be put before the parsing so they can  
happen
-     * simultaneously. This'll require some code-shifting so the read  
buffer
-     * and parse buffer are different. This might speed up the read speed;
-     * the disk throughput isn't being reached at the present.
-     */
-    storeDirCoss_ParseStripeBuffer(rb);
+    const char * args[8];
+    char num_stripes[128], block_size[128], stripe_size[128];

-  nextstripe:
-    cs->rebuild.curstripe++;
-    if (cs->rebuild.curstripe >= cs->numstripes) {
-       /* Completed the rebuild - move onto the next phase */
-       debug(47, 2) ("COSS: %s: completed reading the stripes.\n",  
stripePath(SD));
-       storeCossRebuildComplete(rb);
-       return;
-    } else {
-       /* Next stripe */
-       storeDirCoss_ReadStripe(rb);
-    }
-}
+    /* Open the rebuild helper */
+    snprintf(num_stripes, sizeof(num_stripes) - 1, "%d", cs->numstripes);
+    snprintf(block_size, sizeof(block_size) - 1, "%d", 1 <<  
cs->blksz_bits);
+    snprintf(stripe_size, sizeof(stripe_size) - 1, "%d", COSS_MEMBUF_SZ);
+
+    args[0] = Config.Program.coss_log_build;
+    args[1] = "rebuild";
+    args[2] = SD->path;
+    args[3] = block_size;
+    args[4] = stripe_size;
+    args[5] = num_stripes;
+    args[6] = NULL;

-static void
-storeDirCoss_ReadStripe(RebuildState * rb)
-{
-    SwapDir *SD = rb->sd;
-    CossInfo *cs = SD->fsdata;
+    debug(47, 2) ("COSS: %s: Beginning disk rebuild.\n", stripePath(SD));

-    assert(cs->rebuild.reading == 0);
-    cs->rebuild.reading = 1;
-    /* Use POSIX AIO for now */
-    debug(47, 2) ("COSS: %s: reading stripe %d\n", stripePath(SD),  
cs->rebuild.curstripe);
-    if (cs->rebuild.curstripe > rb->report_current) {
-       debug(47, 1) ("COSS: %s: Rebuilding (%d %% completed - %d/%d 
stripes)\n",  
stripePath(SD),
-           cs->rebuild.curstripe * 100 / cs->numstripes, 
cs->rebuild.curstripe,  
cs->numstripes);
-       rb->report_current += rb->report_interval;
-    }
-    /* XXX this should be a prime candidate to use a modified aioRead  
which doesn't malloc a damned buffer */
-    aioRead(cs->fd, (off_t) cs->rebuild.curstripe * COSS_MEMBUF_SZ,  
COSS_MEMBUF_SZ, storeDirCoss_ReadStripeComplete, rb);
-}
+    rb->helper.pid = ipcCreate(IPC_STREAM, Config.Program.coss_log_build,  
args, "coss_rebuild helper",
+      0, &rb->helper.r_fd, &rb->helper.w_fd, NULL);
+    assert(rb->helper.pid != -1);
+
+    /* Setup incoming read buffer */
+    /* XXX eww, this should really be in a producer/consumer library  
damnit */
+    rb->rbuf.buf = xmalloc(65536);
+    rb->rbuf.size = 65536;
+    rb->rbuf.used = 0;
+
+    /* Register for read interest */
+    commSetSelect(rb->helper.r_fd, COMM_SELECT_READ,  
storeCossRebuildHelperRead, rb, 0);

-static void
-storeDirCoss_StartDiskRebuild(RebuildState * rb)
-{
-    SwapDir *SD = rb->sd;
-    CossInfo *cs = SD->fsdata;
      assert(cs->rebuild.rebuilding == 0);
      assert(cs->numstripes > 0);
-    assert(cs->rebuild.buf == NULL);
      assert(cs->fd >= 0);
+
      cs->rebuild.rebuilding = 1;
-    cs->rebuild.curstripe = 0;
-    cs->rebuild.buf = xmalloc(COSS_MEMBUF_SZ);
-    rb->report_interval = cs->numstripes / COSS_REPORT_INTERVAL;
-    rb->report_current = 0;
-    debug(47, 2) ("COSS: %s: Beginning disk rebuild.\n", stripePath(SD));
-    storeDirCoss_ReadStripe(rb);
+
  }

  CBDATA_TYPE(RebuildState);

Modified: playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h       
(original)
+++ playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h       Sun Jul 
 
19 06:41:02 2009
@@ -6,8 +6,6 @@
      SwapDir *sd;
      int n_read;
      FILE *log;
-    int report_interval;
-    int report_current;
      struct {
          unsigned int clean:1;
      } flags;
@@ -18,6 +16,15 @@
          int fresher;
          int unknown;
      } cosscounts;
+    struct {
+       int r_fd, w_fd;
+       pid_t pid;
+    } helper;
+    struct {
+        char *buf;
+        int size;
+        int used;
+   } rbuf;
  };

  extern void storeCossDirRebuild(SwapDir * sd);

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"lusca-commit" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/lusca-commit?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to