Author: adrian.chadd
Date: Sun Jul 19 06:41:02 2009
New Revision: 14212
Modified:
playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h
playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c
playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h
Log:
Add the new COSS rebuild code which uses the helper.
Modified: playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h (original)
+++ playpen/LUSCA_HEAD_storework/src/fs/coss/store_coss.h Sun Jul 19
06:41:02 2009
@@ -156,10 +156,6 @@
int curstripe;
struct {
char rebuilding;
- char reading;
- int curstripe;
- char *buf;
- int buflen;
} rebuild;
int max_disk_nf;
Modified: playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c
(original)
+++ playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.c Sun Jul
19 06:41:02 2009
@@ -37,11 +37,11 @@
#include "../../libasyncio/aiops.h"
#include "../../libasyncio/async_io.h"
+#include "../../libsqstore/store_log.h"
+
#include "store_coss.h"
#include "store_rebuild_coss.h"
-static void storeDirCoss_ReadStripe(RebuildState * rb);
-
static void
storeCossRebuildComplete(void *data)
{
@@ -53,44 +53,45 @@
storeRebuildComplete(&rb->counts);
debug(47, 1) ("COSS: %s: Rebuild Completed\n", stripePath(SD));
cs->rebuild.rebuilding = 0;
+ if (rb->helper.pid != -1)
+ ipcClose(rb->helper.pid, rb->helper.r_fd, rb->helper.w_fd);
+ safe_free(rb->rbuf.buf);
debug(47, 1) (" %d objects scanned, %d objects relocated, %d objects
fresher, %d objects ignored\n",
rb->counts.scancount, rb->cosscounts.reloc,
rb->cosscounts.fresher, rb->cosscounts.unknown);
cbdataFree(rb);
}
static void
-storeCoss_AddStoreEntry(RebuildState * rb, const cache_key * key,
StoreEntry * e)
+storeCoss_AddStoreEntry(RebuildState * rb, const cache_key * key,
storeSwapLogData *d)
{
StoreEntry *ne;
SwapDir *SD = rb->sd;
CossInfo *cs = SD->fsdata;
rb->counts.objcount++;
- /* The Passed-in store entry is temporary; don't bloody use it
directly! */
- assert(e->swap_dirn == SD->index);
+
ne = new_StoreEntry(STORE_ENTRY_WITHOUT_MEMOBJ, NULL);
ne->store_status = STORE_OK;
storeSetMemStatus(ne, NOT_IN_MEMORY);
ne->swap_status = SWAPOUT_DONE;
- ne->swap_filen = e->swap_filen;
+ ne->swap_filen = d->swap_filen;
ne->swap_dirn = SD->index;
- ne->swap_file_sz = e->swap_file_sz;
+ ne->swap_file_sz = d->swap_file_sz;
ne->lock_count = 0;
- ne->lastref = e->lastref;
- ne->timestamp = e->timestamp;
- ne->expires = e->expires;
- ne->lastmod = e->lastmod;
- ne->refcount = e->refcount;
- ne->flags = e->flags;
+ ne->lastref = d->lastref;
+ ne->timestamp = d->timestamp;
+ ne->expires = d->expires;
+ ne->lastmod = d->lastmod;
+ ne->refcount = d->refcount;
+ ne->flags = d->flags;
EBIT_SET(ne->flags, ENTRY_CACHABLE);
EBIT_CLR(ne->flags, RELEASE_REQUEST);
EBIT_CLR(ne->flags, KEY_PRIVATE);
ne->ping_status = PING_NONE;
EBIT_CLR(ne->flags, ENTRY_VALIDATED);
storeHashInsert(ne, key); /* do it after we clear KEY_PRIVATE */
- storeCossAdd(SD, ne, cs->rebuild.curstripe);
+ storeCossAdd(SD, ne, storeCossFilenoToStripe(cs, d->swap_filen));
storeEntryDump(ne, 5);
assert(ne->repl.data != NULL);
- assert(e->repl.data == NULL);
}
static void
@@ -114,7 +115,7 @@
* stripe.
*/
static void
-storeCoss_ConsiderStoreEntry(RebuildState * rb, const cache_key * key,
StoreEntry * e)
+storeCoss_ConsiderStoreEntry(RebuildState * rb, const cache_key * key,
storeSwapLogData * d)
{
StoreEntry *oe;
@@ -122,9 +123,9 @@
oe = storeGet(key);
if (oe == NULL) {
rb->cosscounts.new++;
- debug(47, 3) ("COSS: Adding filen %d\n", e->swap_filen);
+ debug(47, 3) ("COSS: Adding filen %d\n", d->swap_filen);
/* no clash! woo, can add and forget */
- storeCoss_AddStoreEntry(rb, key, e);
+ storeCoss_AddStoreEntry(rb, key, d);
return;
}
/* This isn't valid - its possible we have a fresher object in another
store */
@@ -134,267 +135,135 @@
/* Dang, its a clash. See if its fresher */
/* Fresher? Its a new object: deallocate the old one, reallocate the
new one */
- if (e->lastref > oe->lastref) {
- debug(47, 3) ("COSS: fresher object for filen %d found (%ld -> %ld)\n",
oe->swap_filen, (long int) oe->timestamp, (long int) e->timestamp);
+ if (d->lastref > oe->lastref) {
+ debug(47, 3) ("COSS: fresher object for filen %d found (%ld -> %ld)\n",
oe->swap_filen, (long int) oe->timestamp, (long int) d->timestamp);
rb->cosscounts.fresher++;
storeCoss_DeleteStoreEntry(rb, key, oe);
oe = NULL;
- storeCoss_AddStoreEntry(rb, key, e);
+ storeCoss_AddStoreEntry(rb, key, d);
return;
}
/*
* Not fresher? Its the same object then we /should/ probably relocate
it; I'm
* not sure what should be done here.
*/
- if (oe->timestamp == e->timestamp && oe->expires == e->expires) {
- debug(47, 3) ("COSS: filen %d -> %d (since they're the same!)\n",
oe->swap_filen, e->swap_filen);
+ if (oe->timestamp == d->timestamp && oe->expires == d->expires) {
+ debug(47, 3) ("COSS: filen %d -> %d (since they're the same!)\n",
oe->swap_filen, d->swap_filen);
rb->cosscounts.reloc++;
storeCoss_DeleteStoreEntry(rb, key, oe);
oe = NULL;
- storeCoss_AddStoreEntry(rb, key, e);
+ storeCoss_AddStoreEntry(rb, key, d);
return;
}
- debug(47, 3) ("COSS: filen %d: ignoring this one for some reason\n",
e->swap_filen);
+ debug(47, 3) ("COSS: filen %d: ignoring this one for some reason\n",
d->swap_filen);
rb->cosscounts.unknown++;
}
-
-
/*
- * Take a stripe and attempt to place objects into it
+ * This is a cut-and-paste from the AUFS helper read function.
+ * Tsk!
*/
static void
-storeDirCoss_ParseStripeBuffer(RebuildState * rb)
+storeCossRebuildHelperRead(int fd, void *data)
{
- SwapDir *SD = rb->sd;
- CossInfo *cs = SD->fsdata;
- tlv *t, *tlv_list;
- int j = 0;
- int bl = 0;
- int tmp;
- squid_off_t *l, len = 0;
- int blocksize = cs->blksz_mask + 1;
- StoreEntry tmpe;
- cache_key key[SQUID_MD5_DIGEST_LENGTH];
- sfileno filen;
-
- assert(cs->rebuild.rebuilding == 1);
- assert(cs->numstripes > 0);
- assert(cs->rebuild.buf != NULL);
+ RebuildState *rb = data;
+ SwapDir *sd = rb->sd;
+ /* squidaioinfo_t *aioinfo = (squidaioinfo_t *) sd->fsdata; */
+ int r, i;
+ storeSwapLogData s;
+
+ assert(fd == rb->helper.r_fd);
+ debug(47, 5) ("storeCossRebuildHelperRead: %s: ready for helper
read\n", sd->path);
+
+ assert(rb->rbuf.size - rb->rbuf.used > 0);
+ debug(47, 8) ("storeCossRebuildHelperRead: %s: trying to read %d
bytes\n", sd->path, rb->rbuf.size - rb->rbuf.used);
+ r = FD_READ_METHOD(fd, rb->rbuf.buf + rb->rbuf.used, rb->rbuf.size
- rb->rbuf.used);
+ debug(47, 8) ("storeCossRebuildHelperRead: %s: read %d bytes\n",
sd->path, r);
+ if (r <= 0) {
+ /* Error or EOF */
+ debug(47, 1) ("storeCossRebuildHelperRead: %s: read
returned %d; error/eof?\n", sd->path, r);
+ storeCossRebuildComplete(rb);
+ return;
+ }
+ rb->rbuf.used += r;
+
+ /* We have some data; process what we can */
+ i = 0;
+ while (i + sizeof(storeSwapLogData) <= rb->rbuf.used) {
+ memcpy(&s, rb->rbuf.buf + i, sizeof(storeSwapLogData));
+ switch (s.op) {
+ case SWAP_LOG_VERSION:
+ break;
+ case SWAP_LOG_PROGRESS:
+ storeRebuildProgress(rb->sd->index,
+ ((storeSwapLogProgress *)(&s))->total,
((storeSwapLogProgress *)(&s))->progress);
+ break;
+ case SWAP_LOG_COMPLETED:
+ debug(47, 1) (" %s: completed rebuild\n",
sd->path);
+ storeCossRebuildComplete(rb);
+ return;
+ default:
+ rb->n_read++;
+ /*
storeAufsDirRebuildFromSwapLogObject(rb, s); */
+ storeCoss_ConsiderStoreEntry(rb, s.key, &s);
+ rb->counts.scancount++;
+ }
+ i += sizeof(storeSwapLogData);
+ }
+ debug(47, 5) ("storeCossRebuildHelperRead: %s: read %d entries\n",
sd->path, i / sizeof(storeSwapLogData));
+
+ /* Shuffle what is left to the beginning of the buffer */
+ if (i < rb->rbuf.used) {
+ memmove(rb->rbuf.buf, rb->rbuf.buf + i, rb->rbuf.used - i);
+ rb->rbuf.used -= i;
+ }
- if (cs->rebuild.buflen == 0) {
- debug(47, 3) ("COSS: %s: stripe %d: read 0 bytes, skipping stripe\n",
stripePath(SD), cs->rebuild.curstripe);
- return;
- }
- while (j < cs->rebuild.buflen) {
- l = NULL;
- bl = 0;
- /* XXX there's no bounds checking on the buffer being passed into
storeSwapMetaUnpack! */
- tlv_list = storeSwapMetaUnpack(cs->rebuild.buf + j, &bl);
- if (tlv_list == NULL) {
- debug(47, 3) ("COSS: %s: stripe %d: offset %d gives NULL swapmeta
data; end of stripe\n", stripePath(SD), cs->rebuild.curstripe, j);
- return;
- }
- filen = (off_t) j / (off_t) blocksize + (off_t) ((off_t)
cs->rebuild.curstripe * (off_t) COSS_MEMBUF_SZ / (off_t) blocksize);
- debug(47, 3) ("COSS: %s: stripe %d: filen %d: header size %d\n",
stripePath(SD), cs->rebuild.curstripe, filen, bl);
-
- /* COSS objects will have an object size written into the metadata */
- memset(&tmpe, 0, sizeof(tmpe));
- memset(key, 0, sizeof(key));
- for (t = tlv_list; t; t = t->next) {
- switch (t->type) {
- case STORE_META_URL:
- debug(47, 3) (" URL: %s\n", (char *) t->value);
- break;
- case STORE_META_OBJSIZE:
- l = t->value;
- debug(47, 3) ("Size: %" PRINTF_OFF_T " (len %d)\n", *l,
t->length);
- break;
- case STORE_META_KEY:
- if (t->length != SQUID_MD5_DIGEST_LENGTH) {
- debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid
STORE_META_KEY length. Ignoring object.\n", stripePath(SD),
cs->rebuild.curstripe, j);
- goto nextobject;
- }
- xmemcpy(key, t->value, SQUID_MD5_DIGEST_LENGTH);
- break;
-#if SIZEOF_SQUID_FILE_SZ == SIZEOF_SIZE_T
- case STORE_META_STD:
- if (t->length != STORE_HDR_METASIZE) {
- debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid
STORE_META_STD length. Ignoring object.\n", stripePath(SD),
cs->rebuild.curstripe, j);
- goto nextobject;
- }
- xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE);
- break;
-#else
- case STORE_META_STD_LFS:
- if (t->length != STORE_HDR_METASIZE) {
- debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid
STORE_META_STD_LFS length. Ignoring object.\n", stripePath(SD),
cs->rebuild.curstripe, j);
- goto nextobject;
- }
- xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE);
- break;
- case STORE_META_STD:
- if (t->length != STORE_HDR_METASIZE_OLD) {
- debug(47, 1) ("COSS: %s: stripe %d: offset %d has invalid
STORE_META_STD length. Ignoring object.\n", stripePath(SD),
cs->rebuild.curstripe, j);
- goto nextobject;
- } {
- struct {
- time_t timestamp;
- time_t lastref;
- time_t expires;
- time_t lastmod;
- size_t swap_file_sz;
- u_short refcount;
- u_short flags;
- } *tmp = t->value;
- assert(sizeof(*tmp) == STORE_HDR_METASIZE_OLD);
- tmpe.timestamp = tmp->timestamp;
- tmpe.lastref = tmp->lastref;
- tmpe.expires = tmp->expires;
- tmpe.lastmod = tmp->lastmod;
- tmpe.swap_file_sz = tmp->swap_file_sz;
- tmpe.refcount = tmp->refcount;
- tmpe.flags = tmp->flags;
- }
- break;
-#endif
- }
- }
- /* Make sure we have an object; if we don't then it may be an
indication
of trouble */
- if (l == NULL) {
- debug(47, 3) ("COSS: %s: stripe %d: Object with no size; end of
stripe\n", stripePath(SD), cs->rebuild.curstripe);
- storeSwapTLVFree(tlv_list);
- return;
- }
- len = *l;
- /* Finally, make sure there's enough data left in this stripe to
satisfy
the object
- * we've just been informed about
- */
- if ((cs->rebuild.buflen - j) < (len + bl)) {
- debug(47, 3) ("COSS: %s: stripe %d: Not enough data in this stripe
for this object, bye bye.\n", stripePath(SD), cs->rebuild.curstripe);
- storeSwapTLVFree(tlv_list);
- return;
- }
- /* Houston, we have an object */
- if (storeKeyNull(key)) {
- debug(47, 3) ("COSS: %s: stripe %d: null data, next!\n",
stripePath(SD), cs->rebuild.curstripe);
- goto nextobject;
- }
- rb->counts.scancount++;
- tmpe.hash.key = key;
- /* Check sizes */
- if (tmpe.swap_file_sz == 0) {
- tmpe.swap_file_sz = len + bl;
- }
- if (tmpe.swap_file_sz != (len + bl)) {
- debug(47, 3) ("COSS: %s: stripe %d: file size mismatch (%"
PRINTF_OFF_T " != %" PRINTF_OFF_T ")\n", stripePath(SD),
cs->rebuild.curstripe, tmpe.swap_file_sz, len);
- goto nextobject;
- }
- if (EBIT_TEST(tmpe.flags, KEY_PRIVATE)) {
- debug(47, 3) ("COSS: %s: stripe %d: private key flag set,
ignoring.\n", stripePath(SD), cs->rebuild.curstripe);
- rb->counts.badflags++;
- goto nextobject;
- }
- /* Time to consider the object! */
- tmpe.swap_filen = filen;
- tmpe.swap_dirn = SD->index;
-
- debug(47, 3) ("COSS: %s Considering filneumber %d\n", stripePath(SD),
tmpe.swap_filen);
- storeCoss_ConsiderStoreEntry(rb, key, &tmpe);
-
- nextobject:
- /* Free the TLV data */
- storeSwapTLVFree(tlv_list);
- tlv_list = NULL;
-
- /* Now, advance to the next block-aligned offset after this object */
- j = j + len + bl;
- /* And now, the blocksize! */
- tmp = j / blocksize;
- tmp = (tmp + 1) * blocksize;
- j = tmp;
- }
+ /* Re-register */
+ commSetSelect(rb->helper.r_fd, COMM_SELECT_READ,
storeCossRebuildHelperRead, rb, 0);
}
static void
-storeDirCoss_ReadStripeComplete(int fd, void *my_data, const char *buf,
int aio_return, int aio_errno)
+storeDirCoss_StartDiskRebuild(RebuildState * rb)
{
- RebuildState *rb = my_data;
SwapDir *SD = rb->sd;
CossInfo *cs = SD->fsdata;
- int r_errflag;
- int r_len;
- r_len = aio_return;
- if (aio_errno)
- r_errflag = aio_errno == ENOSPC ? DISK_NO_SPACE_LEFT : DISK_ERROR;
- else
- r_errflag = DISK_OK;
- xmemcpy(cs->rebuild.buf, buf, r_len);
-
- debug(47, 2) ("COSS: %s: stripe %d, read %d bytes, status %d\n",
stripePath(SD), cs->rebuild.curstripe, r_len, r_errflag);
- cs->rebuild.reading = 0;
- if (r_errflag != DISK_OK) {
- debug(47, 2) ("COSS: %s: stripe %d: error! Ignoring objects in this
stripe.\n", stripePath(SD), cs->rebuild.curstripe);
- goto nextstripe;
- }
- cs->rebuild.buflen = r_len;
- /* parse the stripe contents */
- /*
- * XXX note: the read should be put before the parsing so they can
happen
- * simultaneously. This'll require some code-shifting so the read
buffer
- * and parse buffer are different. This might speed up the read speed;
- * the disk throughput isn't being reached at the present.
- */
- storeDirCoss_ParseStripeBuffer(rb);
+ const char * args[8];
+ char num_stripes[128], block_size[128], stripe_size[128];
- nextstripe:
- cs->rebuild.curstripe++;
- if (cs->rebuild.curstripe >= cs->numstripes) {
- /* Completed the rebuild - move onto the next phase */
- debug(47, 2) ("COSS: %s: completed reading the stripes.\n",
stripePath(SD));
- storeCossRebuildComplete(rb);
- return;
- } else {
- /* Next stripe */
- storeDirCoss_ReadStripe(rb);
- }
-}
+ /* Open the rebuild helper */
+ snprintf(num_stripes, sizeof(num_stripes) - 1, "%d", cs->numstripes);
+ snprintf(block_size, sizeof(block_size) - 1, "%d", 1 <<
cs->blksz_bits);
+ snprintf(stripe_size, sizeof(stripe_size) - 1, "%d", COSS_MEMBUF_SZ);
+
+ args[0] = Config.Program.coss_log_build;
+ args[1] = "rebuild";
+ args[2] = SD->path;
+ args[3] = block_size;
+ args[4] = stripe_size;
+ args[5] = num_stripes;
+ args[6] = NULL;
-static void
-storeDirCoss_ReadStripe(RebuildState * rb)
-{
- SwapDir *SD = rb->sd;
- CossInfo *cs = SD->fsdata;
+ debug(47, 2) ("COSS: %s: Beginning disk rebuild.\n", stripePath(SD));
- assert(cs->rebuild.reading == 0);
- cs->rebuild.reading = 1;
- /* Use POSIX AIO for now */
- debug(47, 2) ("COSS: %s: reading stripe %d\n", stripePath(SD),
cs->rebuild.curstripe);
- if (cs->rebuild.curstripe > rb->report_current) {
- debug(47, 1) ("COSS: %s: Rebuilding (%d %% completed - %d/%d
stripes)\n",
stripePath(SD),
- cs->rebuild.curstripe * 100 / cs->numstripes,
cs->rebuild.curstripe,
cs->numstripes);
- rb->report_current += rb->report_interval;
- }
- /* XXX this should be a prime candidate to use a modified aioRead
which doesn't malloc a damned buffer */
- aioRead(cs->fd, (off_t) cs->rebuild.curstripe * COSS_MEMBUF_SZ,
COSS_MEMBUF_SZ, storeDirCoss_ReadStripeComplete, rb);
-}
+ rb->helper.pid = ipcCreate(IPC_STREAM, Config.Program.coss_log_build,
args, "coss_rebuild helper",
+ 0, &rb->helper.r_fd, &rb->helper.w_fd, NULL);
+ assert(rb->helper.pid != -1);
+
+ /* Setup incoming read buffer */
+ /* XXX eww, this should really be in a producer/consumer library
damnit */
+ rb->rbuf.buf = xmalloc(65536);
+ rb->rbuf.size = 65536;
+ rb->rbuf.used = 0;
+
+ /* Register for read interest */
+ commSetSelect(rb->helper.r_fd, COMM_SELECT_READ,
storeCossRebuildHelperRead, rb, 0);
-static void
-storeDirCoss_StartDiskRebuild(RebuildState * rb)
-{
- SwapDir *SD = rb->sd;
- CossInfo *cs = SD->fsdata;
assert(cs->rebuild.rebuilding == 0);
assert(cs->numstripes > 0);
- assert(cs->rebuild.buf == NULL);
assert(cs->fd >= 0);
+
cs->rebuild.rebuilding = 1;
- cs->rebuild.curstripe = 0;
- cs->rebuild.buf = xmalloc(COSS_MEMBUF_SZ);
- rb->report_interval = cs->numstripes / COSS_REPORT_INTERVAL;
- rb->report_current = 0;
- debug(47, 2) ("COSS: %s: Beginning disk rebuild.\n", stripePath(SD));
- storeDirCoss_ReadStripe(rb);
+
}
CBDATA_TYPE(RebuildState);
Modified: playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h
(original)
+++ playpen/LUSCA_HEAD_storework/src/fs/coss/store_rebuild_coss.h Sun Jul
19 06:41:02 2009
@@ -6,8 +6,6 @@
SwapDir *sd;
int n_read;
FILE *log;
- int report_interval;
- int report_current;
struct {
unsigned int clean:1;
} flags;
@@ -18,6 +16,15 @@
int fresher;
int unknown;
} cosscounts;
+ struct {
+ int r_fd, w_fd;
+ pid_t pid;
+ } helper;
+ struct {
+ char *buf;
+ int size;
+ int used;
+ } rbuf;
};
extern void storeCossDirRebuild(SwapDir * sd);
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"lusca-commit" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/lusca-commit?hl=en
-~----------~----~----~----~------~----~------~--~---