Author: adrian.chadd
Date: Thu Jul 9 20:56:55 2009
New Revision: 14169
Modified:
playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.c
playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.h
Log:
Replace the storedir rebuild code with a call to the helper.
* open and close the helper using ipcCreate() / ipcClose()
* let the helper choose which is the best way to rebuild the storedir
* create a simple producer/consumer buffer (which should really be in
a library! grr adrian) to handle the incoming objects to be handled.
The rebuild path still uses a non-threaded/non-async buffered write to
write to the temporary swaplog - so performance is still not anywhere
near what is possible.
The "rebuilding percentage" stuff isn't yet done either. I'm going to
rely on the helper inserting PROGRESS swaplog messages in the stream
to display. This hasn't yet been coded.
The actual old code hasn't been taken out - its been #if 0'ed out for now.
I'll do a drive by removal in a moment.
There is no handling of the COMPLETED swaplog message so it shows up
as a bad swaplog entry.
The rebuild process MUST finish on a COMPLETED message instead of an EOF;
that way Lusca knows the entire contents of the storedir have been slurped
in as best possible. I'll fix that in a subsequent commit.
Modified: playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.c
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.c
(original)
+++ playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.c Thu Jul
9 20:56:55 2009
@@ -77,9 +77,13 @@
store_dirs_rebuilding--;
storeAufsDirCloseTmpSwapLog(rb->sd);
storeRebuildComplete(&rb->counts);
+ if (rb->helper.pid != -1)
+ ipcClose(rb->helper.pid, rb->helper.r_fd, rb->helper.w_fd);
+ safe_free(rb->rbuf.buf);
cbdataFree(rb);
}
+#if 0
static int
storeAufsDirGetNextFile(RebuildState * rb, sfileno * filn_p, int *size)
{
@@ -166,6 +170,7 @@
*filn_p = rb->fn;
return fd;
}
+#endif
/* Add a new object to the cache with empty memory copy and pointer to disk
* use to rebuild store from disk. */
@@ -210,7 +215,7 @@
return e;
}
-
+#if 0
static void
storeAufsDirRebuildFromDirectory(void *data)
{
@@ -375,6 +380,7 @@
}
eventAdd("storeRebuild", storeAufsDirRebuildFromDirectory, rb, 0.0, 1);
}
+#endif
static int
storeAufsDirRebuildFromSwapLogObject(RebuildState *rb, storeSwapLogData s)
@@ -548,6 +554,7 @@
eventAdd("storeRebuild", storeAufsDirRebuildFromSwapLog, rb, 0.0, 1);
}
+#if 0
static void
storeAufsDirRebuildFromSwapLogCheckVersion(void *data)
{
@@ -592,6 +599,54 @@
eventAdd("storeRebuild", storeAufsDirRebuildFromSwapLog, rb, 0.0, 1);
#endif
}
+#endif
+
+static void
+storeAufsRebuildHelperRead(int fd, void *data)
+{
+ RebuildState *rb = data;
+ SwapDir *sd = rb->sd;
+ /* squidaioinfo_t *aioinfo = (squidaioinfo_t *) sd->fsdata; */
+ int r, i;
+ storeSwapLogData s;
+
+ assert(fd == rb->helper.r_fd);
+ debug(47, 5) ("storeAufsRebuildHelperRead: %s: ready for helper
read\n",
sd->path);
+
+ assert(rb->rbuf.size - rb->rbuf.used > 0);
+ debug(47, 8) ("storeAufsRebuildHelperRead: %s: trying to read %d
bytes\n", sd->path, rb->rbuf.size - rb->rbuf.used);
+ r = FD_READ_METHOD(fd, rb->rbuf.buf + rb->rbuf.used, rb->rbuf.size -
rb->rbuf.used);
+ debug(47, 8) ("storeAufsRebuildHelperRead: %s: read %d bytes\n",
sd->path, r);
+ if (r <= 0) {
+ /* Error or EOF */
+ debug(47, 1) ("storeAufsRebuildHelperRead: %s: read returned
%d;
error/eof?\n", sd->path, r);
+ ipcClose(rb->helper.pid, rb->helper.r_fd, rb->helper.w_fd);
+ rb->helper.pid = rb->helper.r_fd = rb->helper.w_fd = -1;
+ storeAufsDirRebuildComplete(rb);
+ return;
+ }
+ rb->rbuf.used += r;
+
+ /* We have some data; process what we can */
+ i = 0;
+ while (i + sizeof(storeSwapLogData) < rb->rbuf.used) {
+ rb->n_read++;
+ rb->counts.scancount++;
+ memcpy(&s, rb->rbuf.buf + i, sizeof(storeSwapLogData));
+ storeAufsDirRebuildFromSwapLogObject(rb, s);
+ i += sizeof(storeSwapLogData);
+ }
+ debug(47, 5) ("storeAufsRebuildHelperRead: %s: read %d objects\n",
sd->path, i / sizeof(storeSwapLogData));
+
+ /* Shuffle what is left to the beginning of the buffer */
+ if (i < rb->rbuf.used) {
+ memmove(rb->rbuf.buf, rb->rbuf.buf + i, rb->rbuf.used - i);
+ rb->rbuf.used -= i;
+ }
+
+ /* Re-register */
+ commSetSelect(rb->helper.r_fd, COMM_SELECT_READ,
storeAufsRebuildHelperRead, rb, 0);
+}
CBDATA_TYPE(RebuildState);
@@ -614,10 +669,47 @@
int clean = 0;
int zero = 0;
int log_fd;
+#if 0
EVH *func = NULL;
+#endif
CBDATA_INIT_TYPE(RebuildState);
rb = cbdataAlloc(RebuildState);
rb->sd = sd;
+ const char * args[8];
+ char l1[128], l2[128];
+ squidaioinfo_t *aioinfo = (squidaioinfo_t *) sd->fsdata;
+
+ /* Open the rebuild helper */
+ snprintf(l1, sizeof(l1)-1, "%d", aioinfo->l1);
+ snprintf(l2, sizeof(l2)-1, "%d", aioinfo->l2);
+ args[0] = "(ufs rebuilding)";
+ args[1] = "rebuild";
+ args[2] = sd->path;
+ args[3] = l1;
+ args[4] = l2;
+ args[5] = xstrdup(storeAufsDirSwapLogFile(sd, NULL));
+ args[6] = NULL;
+
+ rb->helper.pid = ipcCreate(IPC_STREAM, Config.Program.ufs_log_build,
args, "ufs rebuilding",
+ 0, &rb->helper.r_fd, &rb->helper.w_fd, NULL);
+ assert(rb->helper.pid != -1);
+ safe_free(args[5]);
+
+ /* Setup incoming read buffer */
+ /* XXX eww, this should really be in a producer/consumer library
damnit */
+ rb->rbuf.buf = xmalloc(65536);
+ rb->rbuf.size = 65536;
+ rb->rbuf.used = 0;
+
+ /* Register for read interest */
+ commSetSelect(rb->helper.r_fd, COMM_SELECT_READ,
storeAufsRebuildHelperRead, rb, 0);
+
+ /* aaand we begin */
+ log_fd = storeAufsDirOpenTmpSwapLog(sd, &clean, &zero);
+ file_close(log_fd); /* We don't need this open anyway..? */
+
+#if 0
+
rb->speed = opt_foreground_rebuild ? 1 << 30 : 50;
/*
* If the swap.state file exists in the cache_dir, then
@@ -625,7 +717,6 @@
* use storeAufsDirRebuildFromDirectory() to open up each file
* and suck in the meta data.
*/
- log_fd = storeAufsDirOpenTmpSwapLog(sd, &clean, &zero);
if (! log_fd || zero) {
if (log_fd)
file_close(log_fd);
@@ -635,7 +726,10 @@
rb->log_fd = log_fd;
rb->flags.clean = (unsigned int) clean;
}
+#endif
debug(47, 1) ("Rebuilding storage in %s (%s)\n", sd->path,
clean ? "CLEAN" : "DIRTY");
store_dirs_rebuilding++;
+#if 0
eventAdd("storeRebuild", func, rb, 0.0, 1);
+#endif
}
Modified: playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.h
==============================================================================
--- playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.h
(original)
+++ playpen/LUSCA_HEAD_storework/src/fs/aufs/store_rebuild_aufs.h Thu Jul
9 20:56:55 2009
@@ -22,6 +22,16 @@
char fullpath[SQUID_MAXPATHLEN];
char fullfilename[SQUID_MAXPATHLEN];
struct _store_rebuild_data counts;
+
+ struct {
+ int r_fd, w_fd;
+ pid_t pid;
+ } helper;
+ struct {
+ char *buf;
+ int size;
+ int used;
+ } rbuf;
};
extern void storeAufsDirRebuild(SwapDir * sd);
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"lusca-commit" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/lusca-commit?hl=en
-~----------~----~----~----~------~----~------~--~---