Greetings,

This diff against head (f59558a04c5ad052dc03ceeda62ccf31f4ab0004) of

   https://github.com/OGAWAHirofumi/linux-tux3/tree/hirofumi-user

provides the optimized fsync code that was used to generate the
benchmark results here:

   https://lkml.org/lkml/2015/4/28/838
   "How fast can we fsync?"

This patch also applies to:

   https://github.com/OGAWAHirofumi/linux-tux3/tree/hirofumi

which is a 3.19 kernel cloned from mainline. (Preferred)

Build instructions are on the wiki:

   https://github.com/OGAWAHirofumi/linux-tux3/wiki

There is some slight skew in the instructions because this is
not on master yet.

****************************************************************
*****  Caveat: No out of space handling on this branch!  *******
*** If you run out of space you will get a mysterious assert ***
****************************************************************

Enjoy!

Daniel

diff --git a/fs/tux3/buffer.c b/fs/tux3/buffer.c
index ef0d917..a141687 100644
--- a/fs/tux3/buffer.c
+++ b/fs/tux3/buffer.c
@@ -29,7 +29,7 @@ TUX3_DEFINE_STATE_FNS(unsigned long, buf, BUFDELTA_AVAIL, 
BUFDELTA_BITS,
  * may not work on all arch (If set_bit() and cmpxchg() is not
  * exclusive, this has race).
  */
-static void tux3_set_bufdelta(struct buffer_head *buffer, int delta)
+void tux3_set_bufdelta(struct buffer_head *buffer, int delta)
 {
        unsigned long state, old_state;
 
diff --git a/fs/tux3/commit.c b/fs/tux3/commit.c
index 909a222..955c441a 100644
--- a/fs/tux3/commit.c
+++ b/fs/tux3/commit.c
@@ -289,12 +289,13 @@ static int commit_delta(struct sb *sb)
                req_flag |= REQ_NOIDLE | REQ_FLUSH | REQ_FUA;
        }
 
-       trace("commit %i logblocks", be32_to_cpu(sb->super.logcount));
+       trace("commit %i logblocks", logcount(sb));
        err = save_metablock(sb, req_flag);
        if (err)
                return err;
 
-       tux3_wake_delta_commit(sb);
+       if (!fsync_mode(sb))
+               tux3_wake_delta_commit(sb);
 
        /* Commit was finished, apply defered bfree. */
        return unstash(sb, &sb->defree, apply_defered_bfree);
@@ -314,8 +315,7 @@ static void post_commit(struct sb *sb, unsigned delta)
 
 static int need_unify(struct sb *sb)
 {
-       static unsigned crudehack;
-       return !(++crudehack % 3);
+       return logcount(sb) > 300; /* FIXME: should be based on bandwidth and 
tunable */
 }
 
 /* For debugging */
@@ -359,7 +359,7 @@ static int do_commit(struct sb *sb, int flags)
         * FIXME: there is no need to commit if normal inodes are not
         * dirty? better way?
         */
-       if (!(flags & __FORCE_DELTA) && !tux3_has_dirty_inodes(sb, delta))
+       if (0 && !(flags & __FORCE_DELTA) && !tux3_has_dirty_inodes(sb, delta))
                goto out;
 
        /* Prepare to wait I/O */
@@ -402,6 +402,7 @@ static int do_commit(struct sb *sb, int flags)
 #endif
 
        if ((!no_unify && need_unify(sb)) || (flags & __FORCE_UNIFY)) {
+               trace("unify %u, delta %u", sb->unify, delta);
                err = unify_log(sb);
                if (err)
                        goto error; /* FIXME: error handling */
diff --git a/fs/tux3/commit_flusher.c b/fs/tux3/commit_flusher.c
index 59d6781..31cd51e 100644
--- a/fs/tux3/commit_flusher.c
+++ b/fs/tux3/commit_flusher.c
@@ -198,6 +198,8 @@ long tux3_writeback(struct super_block *super, struct 
bdi_writeback *wb,
        if (work->reason == WB_REASON_SYNC)
                goto out;
 
+       trace("tux3_writeback, reason = %i", work->reason);
+       
        if (work->reason == WB_REASON_TUX3_PENDING) {
                struct tux3_wb_work *wb_work;
                /* Specified target delta for staging. */
@@ -343,3 +345,7 @@ static void schedule_flush_delta(struct sb *sb, struct 
delta_ref *delta_ref)
        sb->delta_pending++;
        wake_up_all(&sb->delta_transition_wq);
 }
+
+#ifdef __KERNEL__
+#include "commit_fsync.c"
+#endif
diff --git a/fs/tux3/commit_fsync.c b/fs/tux3/commit_fsync.c
new file mode 100644
index 0000000..9a59c59
--- /dev/null
+++ b/fs/tux3/commit_fsync.c
@@ -0,0 +1,341 @@
+/*
+ * Optimized fsync.
+ *
+ * Copyright (c) 2015 Daniel Phillips
+ */
+
+#include <linux/delay.h>
+
+static inline int fsync_pending(struct sb *sb)
+{
+       return atomic_read(&sb->fsync_pending);
+}
+
+static inline int delta_needed(struct sb *sb)
+{
+       return waitqueue_active(&sb->delta_transition_wq);
+}
+
+static inline int fsync_drain(struct sb *sb)
+{
+       return test_bit(TUX3_FSYNC_DRAIN_BIT, &sb->backend_state);
+}
+
+static inline unsigned fsync_group(struct sb *sb)
+{
+       return atomic_read(&sb->fsync_group);
+}
+
+static int suspend_transition(struct sb *sb)
+{
+       while (sb->suspended == NULL) {
+               if (!test_and_set_bit(TUX3_STATE_TRANSITION_BIT, 
&sb->backend_state)) {
+                       sb->suspended = delta_get(sb);
+                       return 1;
+               }
+               cpu_relax();
+       }
+       return 0;
+}
+
+static void resume_transition(struct sb *sb)
+{
+       delta_put(sb, sb->suspended);
+       sb->suspended = NULL;
+
+       if (need_unify(sb))
+               delta_transition(sb);
+
+       /* Make sure !suspended is visible before transition clear  */
+       smp_mb__before_atomic();
+       clear_bit(TUX3_STATE_TRANSITION_BIT, &sb->backend_state);
+       /* Make sure transition clear is visible  before drain clear */
+       smp_mb__before_atomic();
+       clear_bit(TUX3_FSYNC_DRAIN_BIT, &sb->backend_state);
+       wake_up_all(&sb->delta_transition_wq);
+}
+
+static void tux3_wait_for_free(struct sb *sb, unsigned delta)
+{
+       unsigned free_delta = delta + TUX3_MAX_DELTA;
+       /* FIXME: better to be killable */
+       wait_event(sb->delta_transition_wq,
+                  delta_after_eq(sb->delta_free, free_delta));
+}
+
+/*
+ * Write log and commit. (Mostly borrowed from do_commit)
+ *
+ * This needs specfic handling for the commit block, so
+ * maybe add an fsync flag to commit_delta.
+ */
+static int commit_fsync(struct sb *sb, unsigned delta, struct blk_plug *plug)
+{
+       write_btree(sb, delta);
+       write_log(sb);
+       blk_finish_plug(plug);
+       commit_delta(sb);
+       post_commit(sb, delta);
+       return 0;
+}
+
+enum { groups_per_commit = 4 };
+
+/*
+ * Backend fsync commit task, serialized with delta backend.
+ */
+void fsync_backend(struct work_struct *work)
+{
+       struct sb *sb = container_of(work, struct fsync_work, work)->sb;
+       struct syncgroup *back = &sb->fsync[(fsync_group(sb) - 1) % fsync_wrap];
+       struct syncgroup *front = &sb->fsync[fsync_group(sb) % fsync_wrap];
+       struct syncgroup *idle = &sb->fsync[(fsync_group(sb) + 1) % fsync_wrap];
+       unsigned back_delta = sb->suspended->delta - 1;
+       unsigned start = fsync_group(sb), groups = 0;
+       struct blk_plug plug;
+       int err; /* How to report?? */
+
+       trace("enter fsync backend, delta = %i", sb->suspended->delta);
+       tux3_start_backend(sb);
+       sb->flags |= SB_FSYNC_FLUSH_FLAG;
+
+       while (1) {
+               sb->ioinfo = NULL;
+               assert(list_empty(&tux3_sb_ddc(sb, back_delta)->dirty_inodes));
+               while (atomic_read(&front->busy)) {
+                       struct ioinfo ioinfo;
+                       unsigned i;
+                       /*
+                        * Verify that the tail of the group queue is idle in
+                        * the sense that all waiting fsyncs woke up and 
released
+                        * their busy counts. This busy wait is only theoretical
+                        * because fsync tasks have plenty of time to wake up
+                        * while the the next group commits to media, but handle
+                        * it anyway for completeness.
+                        */
+                       for (i = 0; atomic_read(&idle->busy); i++)
+                               usleep_range(10, 1000);
+                       if (i)
+                               tux3_warn(sb, "*** %u spins on queue full ***", 
i);
+                       reinit_completion(&idle->wait);
+
+                       /*
+                        * Bump the fsync group counter so fsync backend owns 
the
+                        * next group of fsync inodes and can walk stable lists
+                        * while new fsyncs go onto the new frontend lists.
+                        */
+                       spin_lock(&sb->fsync_lock);
+                       atomic_inc(&sb->fsync_group);
+                       spin_unlock(&sb->fsync_lock);
+
+                       back = front;
+                       front = idle;
+                       idle = &sb->fsync[(fsync_group(sb) + 1) % fsync_wrap];
+
+                       trace("fsync flush group %tu, queued = %i, busy = %i",
+                               back - sb->fsync, 
atomic_read(&sb->fsync_pending),
+                               atomic_read(&back->busy));
+
+                       if (!sb->ioinfo) {
+                               tux3_io_init(&ioinfo, REQ_SYNC);
+                               sb->ioinfo = &ioinfo;
+                               blk_start_plug(&plug);
+                       }
+
+                       /*
+                        * NOTE: this may flush same inode multiple times, and 
those
+                        * blocks are submitted under plugging. So, by 
reordering,
+                        * later requests by tux3_flush_inodes() can be flushed
+                        * before former submitted requests. We do page 
forking, and
+                        * don't free until commit, so reorder should not be 
problem.
+                        * But we should remember this surprise.
+                        */
+                       err = tux3_flush_inodes_list(sb, back_delta, 
&back->list);
+                       if (err) {
+                               tux3_warn(sb, "tux3_flush_inodes_list error 
%i!", -err);
+                               goto ouch;
+                       }
+                       list_splice_init(&back->list, &tux3_sb_ddc(sb, 
back_delta)->dirty_inodes);
+                       atomic_sub(atomic_read(&back->busy), 
&sb->fsync_pending);
+
+                       if (++groups < groups_per_commit && 
atomic_read(&front->busy)) {
+                               trace("fsync merge group %u", fsync_group(sb));
+                               continue;
+                       }
+
+                       commit_fsync(sb, back_delta, &plug);
+                       sb->ioinfo = NULL;
+                       wake_up_all(&sb->fsync_collide);
+
+                       /*
+                        * Wake up commit waiters for all groups in this commit.
+                        */
+                       trace("complete %i groups, %i to %i", groups, start, 
start + groups -1);
+                       for (i = 0; i < groups; i++) {
+                               struct syncgroup *done = &sb->fsync[(start + i) 
% fsync_wrap];
+                               complete_all(&done->wait);
+                       }
+
+                       if (!fsync_pending(sb) || delta_needed(sb) || 
need_unify(sb))
+                               set_bit(TUX3_FSYNC_DRAIN_BIT, 
&sb->backend_state);
+
+                       start = fsync_group(sb);
+                       groups = 0;
+               }
+
+               if (fsync_drain(sb) && !fsync_pending(sb))
+                       break;
+
+               usleep_range(10, 500);
+       }
+
+ouch:
+       tux3_end_backend();
+       sb->flags &= ~SB_FSYNC_FLUSH_FLAG;
+       resume_transition(sb);
+       trace("leave fsync backend, group = %i", fsync_group(sb));
+       return; /* FIXME: error? */
+}
+
+int tux3_sync_inode(struct sb *sb, struct inode *inode)
+{
+       void tux3_set_bufdelta(struct buffer_head *buffer, int delta);
+       struct tux3_inode *tuxnode = tux_inode(inode);
+       struct inode_delta_dirty *front_dirty, *back_dirty;
+       struct buffer_head *buffer;
+       struct syncgroup *front;
+       unsigned front_delta;
+       int err = 0, start_backend = 0;
+
+       trace("fsync inode %Lu", (long long)tuxnode->inum);
+
+       /*
+        * Prevent new fsyncs from queuing if fsync_backend wants to exit.
+        */
+       if (fsync_drain(sb))
+               wait_event(sb->delta_transition_wq, !fsync_drain(sb));
+
+       /*
+        * Prevent fsync_backend from exiting and delta from changing until
+        * this fsync is queued and flushed.
+        */
+       atomic_inc(&sb->fsync_pending);
+       start_backend = suspend_transition(sb);
+       front_delta = sb->suspended->delta;
+       front_dirty = tux3_inode_ddc(inode, front_delta);
+       back_dirty = tux3_inode_ddc(inode, front_delta - 1);
+       tux3_wait_for_free(sb, front_delta - 1);
+
+       /*
+        * If another fsync is in progress on this inode then wait to
+        * avoid block collisions.
+        */
+       if (tux3_inode_test_and_set_flag(TUX3_INODE_FSYNC_BIT, inode)) {
+               trace("parallel fsync of inode %Lu", (long long)tuxnode->inum);
+               if (start_backend) {
+                       queue_work(sb->fsync_workqueue, &sb->fsync_work.work);
+                       start_backend = 0;
+               }
+               err = wait_event_killable(sb->fsync_collide,
+                       !tux3_inode_test_and_set_flag(TUX3_INODE_FSYNC_BIT, 
inode));
+               if (err) {
+                       tux3_inode_clear_flag(TUX3_INODE_FSYNC_BIT, inode);
+                       atomic_dec(&sb->fsync_pending);
+                       goto fail;
+               }
+       }
+
+       /*
+        * We own INODE_FSYNC and the delta backend is not running so
+        * if inode is dirty here then it it will still be dirty when we
+        * move it to the backend dirty list. Otherwise, the inode is
+        * clean and fsync should exit here. We owned INODE_FSYNC for a
+        * short time so there might be tasks waiting on fsync_collide.
+        * Similarly, we might own FSYNC_RUNNING and therefore must start
+        * the fsync backend in case some other task failed to own it and
+        * therefore assumes it is running.
+        */
+       if (!tux3_dirty_flags1(inode, front_delta)) {
+               trace("inode %Lu is already clean", (long long)tuxnode->inum);
+               tux3_inode_clear_flag(TUX3_INODE_FSYNC_BIT, inode);
+               atomic_dec(&sb->fsync_pending);
+               if (start_backend)
+                       queue_work(sb->fsync_workqueue, &sb->fsync_work.work);
+               wake_up_all(&sb->fsync_collide);
+               return 0;
+       }
+
+       /*
+        * Exclude new dirties.
+        * Lock order: i_mutex => truncate_lock
+        */
+       mutex_lock(&inode->i_mutex); /* Exclude most dirty sources */
+       down_write(&tux_inode(inode)->truncate_lock); /* Exclude mmap */
+
+       /*
+        * Force block dirty state to previous delta for each dirty
+        * block so block fork protects block data against modify by
+        * parallel tasks while this task waits for commit.
+        *
+        * This walk should not discover any dirty blocks belonging
+        * to the previous delta due to the above wait for delta
+        * commit.
+        */
+       list_for_each_entry(buffer, &front_dirty->dirty_buffers, 
b_assoc_buffers) {
+               //assert(tux3_bufsta_get_delta(buffer->b_state) != delta - 1);
+               tux3_set_bufdelta(buffer, front_delta - 1);
+       }
+
+       /*
+        * Move the the front end dirty block list to the backend, which
+        * is now empty because the previous delta was completed. Remove
+        * the inode from the frontend dirty list and add it to the front
+        * fsync list. Note: this is not a list move because different
+        * link fields are involved. Later, the inode will be moved to
+        * the backend inode dirty list to be flushed but we cannot put
+        * it there right now because it might clobber the previous fsync
+        * group. Update the inode dirty flags to indicate the inode is
+        * dirty in the back, not the front. The list moves must be
+        * under the spin lock to prevent the back end from bumping
+        * the group counter and proceeding with the commit.
+        */
+       trace("fsync queue inode %Lu to group %u",
+               (long long)tuxnode->inum, fsync_group(sb));
+       spin_lock(&tuxnode->lock);
+       spin_lock(&sb->dirty_inodes_lock);
+       //assert(<inode is not dirty in back>);
+       assert(list_empty(&back_dirty->dirty_buffers));
+       assert(list_empty(&back_dirty->dirty_holes));
+       assert(!list_empty(&front_dirty->dirty_list));
+       list_splice_init(&front_dirty->dirty_buffers, 
&back_dirty->dirty_buffers);
+       list_splice_init(&front_dirty->dirty_holes, &back_dirty->dirty_holes);
+       list_del_init(&front_dirty->dirty_list);
+       spin_unlock(&sb->dirty_inodes_lock);
+
+       tux3_dirty_switch_to_prev(inode, front_delta);
+       spin_unlock(&tuxnode->lock);
+
+       spin_lock(&sb->fsync_lock);
+       front = &sb->fsync[fsync_group(sb) % fsync_wrap];
+       list_add_tail(&back_dirty->dirty_list, &front->list);
+       atomic_inc(&front->busy); /* detect queue full */
+       assert(sb->current_delta->delta == front_delta); /* last chance to 
check */
+       spin_unlock(&sb->fsync_lock);
+
+       /*
+        * Allow more dirties during the wait. These will be isolated from
+        * the commit by block forking.
+        */
+       up_write(&tux_inode(inode)->truncate_lock);
+       mutex_unlock(&inode->i_mutex);
+
+       if (start_backend)
+               queue_work(sb->fsync_workqueue, &sb->fsync_work.work);
+
+       wait_for_completion(&front->wait);
+       atomic_dec(&front->busy);
+fail:
+       if (err)
+               tux3_warn(sb, "error %i!!!", err);
+       return err;
+}
diff --git a/fs/tux3/iattr.c b/fs/tux3/iattr.c
index 57a383b..7ac73f5 100644
--- a/fs/tux3/iattr.c
+++ b/fs/tux3/iattr.c
@@ -276,6 +276,8 @@ static int iattr_decode(struct btree *btree, void *data, 
void *attrs, int size)
        }
 
        decode_attrs(inode, attrs, size); // error???
+       tux_inode(inode)->nlink_base = inode->i_nlink;
+
        if (tux3_trace)
                dump_attrs(inode);
        if (tux_inode(inode)->xcache)
diff --git a/fs/tux3/inode.c b/fs/tux3/inode.c
index f747c0e..a10ce38 100644
--- a/fs/tux3/inode.c
+++ b/fs/tux3/inode.c
@@ -922,22 +922,18 @@ void iget_if_dirty(struct inode *inode)
        atomic_inc(&inode->i_count);
 }
 
+enum { fsync_fallback = 0 };
+
 /* Synchronize changes to a file and directory. */
 int tux3_sync_file(struct file *file, loff_t start, loff_t end, int datasync)
 {
        struct inode *inode = file->f_mapping->host;
        struct sb *sb = tux_sb(inode->i_sb);
 
-       /* FIXME: this is sync(2). We should implement real one */
-       static int print_once;
-       if (!print_once) {
-               print_once++;
-               tux3_warn(sb,
-                         "fsync(2) fall-back to sync(2): %Lx-%Lx, datasync %d",
-                         start, end, datasync);
-       }
+       if (fsync_fallback || S_ISDIR(inode->i_mode))
+               return sync_current_delta(sb);
 
-       return sync_current_delta(sb);
+       return tux3_sync_inode(sb, inode);
 }
 
 int tux3_getattr(struct vfsmount *mnt, struct dentry *dentry, struct kstat 
*stat)
diff --git a/fs/tux3/log.c b/fs/tux3/log.c
index bb26c73..a934659 100644
--- a/fs/tux3/log.c
+++ b/fs/tux3/log.c
@@ -83,6 +83,7 @@ unsigned log_size[] = {
        [LOG_BNODE_FREE]        = 7,
        [LOG_ORPHAN_ADD]        = 9,
        [LOG_ORPHAN_DEL]        = 9,
+       [LOG_FSYNC_ORPHAN]      = 9,
        [LOG_FREEBLOCKS]        = 7,
        [LOG_UNIFY]             = 1,
        [LOG_DELTA]             = 1,
@@ -470,6 +471,11 @@ void log_bnode_free(struct sb *sb, block_t bnode)
        log_u48(sb, LOG_BNODE_FREE, bnode);
 }
 
+void log_fsync_orphan(struct sb *sb, unsigned version, tuxkey_t inum)
+{
+       log_u16_u48(sb, LOG_FSYNC_ORPHAN, version, inum);
+}
+
 /*
  * Handle inum as orphan inode
  * (this is log of frontend operation)
diff --git a/fs/tux3/orphan.c b/fs/tux3/orphan.c
index 68d08e8..3ea2d6a 100644
--- a/fs/tux3/orphan.c
+++ b/fs/tux3/orphan.c
@@ -336,7 +336,30 @@ static int load_orphan_inode(struct sb *sb, inum_t inum, 
struct list_head *head)
        tux3_mark_inode_orphan(tux_inode(inode));
        /* List inode up, then caller will decide what to do */
        list_add(&tux_inode(inode)->orphan_list, head);
+       return 0;
+}
 
+int replay_fsync_orphan(struct replay *rp, unsigned version, inum_t inum)
+{
+       struct sb *sb = rp->sb;
+       struct inode *inode = tux3_iget(sb, inum);
+       if (IS_ERR(inode)) {
+               int err = PTR_ERR(inode);
+               return err == -ENOENT ? 0 : err;
+       }
+
+       /*
+        * Multiple fsyncs of new inode can create multiple fsync orphan
+        * log records for the same inode. A later delta may have added a
+        * link.
+        */
+       if (inode->i_nlink != 0 || tux3_inode_is_orphan(tux_inode(inode))) {
+               iput(inode);
+               return 0;
+       }
+
+       tux3_mark_inode_orphan(tux_inode(inode));
+       list_add(&tux_inode(inode)->orphan_list, &rp->orphan_in_otree);
        return 0;
 }
 
diff --git a/fs/tux3/replay.c b/fs/tux3/replay.c
index f1f77e8..99361d6 100644
--- a/fs/tux3/replay.c
+++ b/fs/tux3/replay.c
@@ -29,6 +29,7 @@ static const char *const log_name[] = {
        X(LOG_BNODE_FREE),
        X(LOG_ORPHAN_ADD),
        X(LOG_ORPHAN_DEL),
+       X(LOG_FSYNC_ORPHAN),
        X(LOG_FREEBLOCKS),
        X(LOG_UNIFY),
        X(LOG_DELTA),
@@ -117,20 +118,20 @@ static void replay_unpin_logblocks(struct sb *sb, 
unsigned i, unsigned logcount)
 static struct replay *replay_prepare(struct sb *sb)
 {
        block_t logchain = be64_to_cpu(sb->super.logchain);
-       unsigned i, logcount = be32_to_cpu(sb->super.logcount);
+       unsigned i, count = logcount(sb);
        struct replay *rp;
        struct buffer_head *buffer;
        int err;
 
        /* FIXME: this address array is quick hack. Rethink about log
         * block management and log block address. */
-       rp = alloc_replay(sb, logcount);
+       rp = alloc_replay(sb, count);
        if (IS_ERR(rp))
                return rp;
 
        /* FIXME: maybe, we should use bufvec to read log blocks */
-       trace("load %u logblocks", logcount);
-       i = logcount;
+       trace("load %u logblocks", count);
+       i = count;
        while (i-- > 0) {
                struct logblock *log;
 
@@ -156,7 +157,7 @@ static struct replay *replay_prepare(struct sb *sb)
 
 error:
        free_replay(rp);
-       replay_unpin_logblocks(sb, i, logcount);
+       replay_unpin_logblocks(sb, i, count);
 
        return ERR_PTR(err);
 }
@@ -169,7 +170,7 @@ static void replay_done(struct replay *rp)
        clean_orphan_list(&rp->log_orphan_add); /* for error path */
        free_replay(rp);
 
-       sb->logpos.next = be32_to_cpu(sb->super.logcount);
+       sb->logpos.next = logcount(sb);
        replay_unpin_logblocks(sb, 0, sb->logpos.next);
        log_finish_cycle(sb, 0);
 }
@@ -319,6 +320,7 @@ static int replay_log_stage1(struct replay *rp, struct 
buffer_head *logbuf)
                case LOG_BFREE_RELOG:
                case LOG_LEAF_REDIRECT:
                case LOG_LEAF_FREE:
+               case LOG_FSYNC_ORPHAN:
                case LOG_ORPHAN_ADD:
                case LOG_ORPHAN_DEL:
                case LOG_UNIFY:
@@ -450,6 +452,7 @@ static int replay_log_stage2(struct replay *rp, struct 
buffer_head *logbuf)
                                return err;
                        break;
                }
+               case LOG_FSYNC_ORPHAN:
                case LOG_ORPHAN_ADD:
                case LOG_ORPHAN_DEL:
                {
@@ -459,6 +462,9 @@ static int replay_log_stage2(struct replay *rp, struct 
buffer_head *logbuf)
                        data = decode48(data, &inum);
                        trace("%s: version 0x%x, inum 0x%Lx",
                              log_name[code], version, inum);
+                       if (code == LOG_FSYNC_ORPHAN)
+                               err = replay_fsync_orphan(rp, version, inum);
+                       else
                        if (code == LOG_ORPHAN_ADD)
                                err = replay_orphan_add(rp, version, inum);
                        else
@@ -514,11 +520,11 @@ static int replay_logblocks(struct replay *rp, 
replay_log_t replay_log_func)
 {
        struct sb *sb = rp->sb;
        struct logpos *logpos = &sb->logpos;
-       unsigned logcount = be32_to_cpu(sb->super.logcount);
+       unsigned count = logcount(sb);
        int err;
 
        logpos->next = 0;
-       while (logpos->next < logcount) {
+       while (logpos->next < count) {
                trace("log block %i, blocknr %Lx, unify %Lx",
                      logpos->next, rp->blocknrs[logpos->next],
                      rp->unify_index);
diff --git a/fs/tux3/super.c b/fs/tux3/super.c
index b104dc7..0913d26 100644
--- a/fs/tux3/super.c
+++ b/fs/tux3/super.c
@@ -63,6 +63,7 @@ static void tux3_inode_init_always(struct tux3_inode *tuxnode)
        tuxnode->xcache         = NULL;
        tuxnode->generic        = 0;
        tuxnode->state          = 0;
+       tuxnode->nlink_base     = 0;
 #ifdef __KERNEL__
        tuxnode->io             = NULL;
 #endif
@@ -246,6 +247,9 @@ static void __tux3_put_super(struct sb *sbi)
        sbi->idefer_map = NULL;
        /* FIXME: add more sanity check */
        assert(link_empty(&sbi->forked_buffers));
+
+       if (sbi->fsync_workqueue)
+               destroy_workqueue(sbi->fsync_workqueue);
 }
 
 static struct inode *create_internal_inode(struct sb *sbi, inum_t inum,
@@ -384,6 +388,21 @@ static int init_sb(struct sb *sb)
        for (i = 0; i < ARRAY_SIZE(sb->s_ddc); i++)
                INIT_LIST_HEAD(&sb->s_ddc[i].dirty_inodes);
 
+       for (i = 0; i < fsync_wrap; i++) {
+               INIT_LIST_HEAD(&sb->fsync[i].list);
+               init_completion(&sb->fsync[i].wait);
+               atomic_set(&sb->fsync[i].busy, 0);
+       }
+
+       if (!(sb->fsync_workqueue = create_workqueue("tux3-work")))
+               return -ENOMEM;
+
+       atomic_set(&sb->fsync_group, 0);
+       atomic_set(&sb->fsync_pending, 0);
+       spin_lock_init(&sb->fsync_lock);
+       init_waitqueue_head(&sb->fsync_collide);
+       INIT_WORK(&sb->fsync_work.work, fsync_backend);
+       sb->fsync_work.sb = sb;
        sb->idefer_map = tux3_alloc_idefer_map();
        if (!sb->idefer_map)
                return -ENOMEM;
@@ -773,7 +792,7 @@ static int tux3_fill_super(struct super_block *sb, void 
*data, int silent)
                        goto error;
                }
        }
-       tux3_dbg("s_blocksize %lu", sb->s_blocksize);
+       tux3_dbg("s_blocksize %lu, sb = %p", sb->s_blocksize, tux_sb(sb));
 
        rp = tux3_init_fs(sbi);
        if (IS_ERR(rp)) {
@@ -781,6 +800,7 @@ static int tux3_fill_super(struct super_block *sb, void 
*data, int silent)
                goto error;
        }
 
+       sb->s_flags |= MS_ACTIVE;
        err = replay_stage3(rp, 1);
        if (err) {
                rp = NULL;
diff --git a/fs/tux3/tux3.h b/fs/tux3/tux3.h
index e2f2d9b..cf4bcc6 100644
--- a/fs/tux3/tux3.h
+++ b/fs/tux3/tux3.h
@@ -252,6 +252,7 @@ enum {
        LOG_BNODE_FREE,         /* Log of freeing bnode */
        LOG_ORPHAN_ADD,         /* Log of adding orphan inode */
        LOG_ORPHAN_DEL,         /* Log of deleting orphan inode */
+       LOG_FSYNC_ORPHAN,       /* Log inode fsync with no links  */
        LOG_FREEBLOCKS,         /* Log of freeblocks in bitmap on unify */
        LOG_UNIFY,              /* Log of marking unify */
        LOG_DELTA,              /* just for debugging */
@@ -310,6 +311,29 @@ struct tux3_mount_opt {
        unsigned int flags;
 };
 
+/* Per fsync group dirty inodes and synchronization */
+struct syncgroup {
+       struct list_head list; /* dirty inodes */
+       struct completion wait; /* commit wait */
+       atomic_t busy; /* fsyncs not completed */
+};
+
+struct fsync_work {
+       struct work_struct work;
+       struct sb *sb;
+};
+
+enum { fsync_wrap = 1 << 4 }; /* Maximum fsync groups in flight */
+
+enum sb_state_bits {
+       TUX3_STATE_TRANSITION_BIT,
+       TUX3_FSYNC_DRAIN_BIT, /* force fsync queue to drain */
+};
+
+enum sb_flag_bits {
+       SB_FSYNC_FLUSH_FLAG = 1 << 0, /* fsync specific actions on flush path */
+};
+
 struct tux3_idefer_map;
 /* Tux3-specific sb is a handle for the entire volume state */
 struct sb {
@@ -321,10 +345,8 @@ struct sb {
        struct delta_ref __rcu *current_delta;  /* current delta */
        struct delta_ref delta_refs[TUX3_MAX_DELTA];
        unsigned unify;                         /* log unify cycle */
-
-#define TUX3_STATE_TRANSITION_BIT      0
        unsigned long backend_state;            /* delta state */
-
+       unsigned long flags;                    /* non atomic state */
 #ifdef TUX3_FLUSHER_SYNC
        struct rw_semaphore delta_lock;         /* delta transition exclusive */
 #else
@@ -403,7 +425,28 @@ struct sb {
 #else
        struct super_block vfs_sb;      /* Userland superblock */
 #endif
-};
+       /*
+        * Fsync and fsync backend
+        */
+       spinlock_t fsync_lock;
+       wait_queue_head_t fsync_collide; /* parallel fsync on same inode */
+       atomic_t fsync_group; /* current fsync group */
+       atomic_t fsync_pending; /* fsyncs started but not yet queued */
+       struct syncgroup fsync[fsync_wrap]; /* fsync commit groups */
+       struct workqueue_struct *fsync_workqueue;
+       struct fsync_work fsync_work;
+       struct delta_ref *suspended;
+ };
+ 
+static inline int fsync_mode(struct sb *sb)
+{
+       return sb->flags & SB_FSYNC_FLUSH_FLAG;
+}
+
+static inline unsigned logcount(struct sb *sb)
+{
+       return be32_to_cpu(sb->super.logcount);
+}
 
 /* Block segment (physical block extent) info */
 #define BLOCK_SEG_HOLE         (1 << 0)
@@ -475,6 +518,7 @@ struct tux3_inode {
        };
 
        /* Per-delta dirty data for inode */
+       unsigned nlink_base;            /* link count on media for fsync */
        unsigned state;                 /* inode dirty state */
        unsigned present;               /* Attributes decoded from or
                                         * to be encoded to itree */
@@ -553,6 +597,8 @@ static inline struct list_head *tux3_dirty_buffers(struct 
inode *inode,
 enum {
        /* Deferred inum allocation, and not stored into itree yet. */
        TUX3_I_DEFER_INUM       = 0,
+       /* Fsync in progress (protected by i_mutex) */
+       TUX3_INODE_FSYNC_BIT    = 1,
 
        /* No per-delta buffers, and no page forking */
        TUX3_I_NO_DELTA         = 29,
@@ -579,6 +625,11 @@ static inline void tux3_inode_clear_flag(int bit, struct 
inode *inode)
        clear_bit(bit, &tux_inode(inode)->flags);
 }
 
+static inline int tux3_inode_test_and_set_flag(int bit, struct inode *inode)
+{
+       return test_and_set_bit(bit, &tux_inode(inode)->flags);
+}
+
 static inline int tux3_inode_test_flag(int bit, struct inode *inode)
 {
        return test_bit(bit, &tux_inode(inode)->flags);
@@ -723,6 +774,8 @@ static inline block_t bufindex(struct buffer_head *buffer)
 /* commit.c */
 long tux3_writeback(struct super_block *super, struct bdi_writeback *wb,
                    struct wb_writeback_work *work);
+int tux3_sync_inode(struct sb *sb, struct inode *inode);
+void fsync_backend(struct work_struct *work);
 
 /* dir.c */
 extern const struct file_operations tux_dir_fops;
@@ -967,6 +1020,7 @@ void log_bnode_merge(struct sb *sb, block_t src, block_t 
dst);
 void log_bnode_del(struct sb *sb, block_t node, tuxkey_t key, unsigned count);
 void log_bnode_adjust(struct sb *sb, block_t node, tuxkey_t from, tuxkey_t to);
 void log_bnode_free(struct sb *sb, block_t bnode);
+void log_fsync_orphan(struct sb *sb, unsigned version, tuxkey_t inum);
 void log_orphan_add(struct sb *sb, unsigned version, tuxkey_t inum);
 void log_orphan_del(struct sb *sb, unsigned version, tuxkey_t inum);
 void log_freeblocks(struct sb *sb, block_t freeblocks);
@@ -995,6 +1049,7 @@ void replay_iput_orphan_inodes(struct sb *sb,
                               struct list_head *orphan_in_otree,
                               int destroy);
 int replay_load_orphan_inodes(struct replay *rp);
+int replay_fsync_orphan(struct replay *rp, unsigned version, inum_t inum);
 
 /* super.c */
 struct replay *tux3_init_fs(struct sb *sbi);
@@ -1045,6 +1100,8 @@ static inline void tux3_mark_inode_dirty_sync(struct 
inode *inode)
        __tux3_mark_inode_dirty(inode, I_DIRTY_SYNC);
 }
 
+unsigned tux3_dirty_flags1(struct inode *inode, unsigned delta);
+void tux3_dirty_switch_to_prev(struct inode *inode, unsigned delta);
 void tux3_dirty_inode(struct inode *inode, int flags);
 void tux3_mark_inode_to_delete(struct inode *inode);
 void tux3_iattrdirty(struct inode *inode);
@@ -1058,6 +1115,7 @@ void tux3_mark_inode_orphan(struct tux3_inode *tuxnode);
 int tux3_inode_is_orphan(struct tux3_inode *tuxnode);
 int tux3_flush_inode_internal(struct inode *inode, unsigned delta, int 
req_flag);
 int tux3_flush_inode(struct inode *inode, unsigned delta, int req_flag);
+int tux3_flush_inodes_list(struct sb *sb, unsigned delta, struct list_head 
*dirty_inodes);
 int tux3_flush_inodes(struct sb *sb, unsigned delta);
 int tux3_has_dirty_inodes(struct sb *sb, unsigned delta);
 void tux3_clear_dirty_inodes(struct sb *sb, unsigned delta);
diff --git a/fs/tux3/user/libklib/libklib.h b/fs/tux3/user/libklib/libklib.h
index 31daad5..ae9bba6 100644
--- a/fs/tux3/user/libklib/libklib.h
+++ b/fs/tux3/user/libklib/libklib.h
@@ -117,4 +117,7 @@ extern int __build_bug_on_failed;
 #define S_IWUGO                (S_IWUSR|S_IWGRP|S_IWOTH)
 #define S_IXUGO                (S_IXUSR|S_IXGRP|S_IXOTH)
 
+struct work_struct { };
+struct workqueue_struct { };
+
 #endif /* !LIBKLIB_H */
diff --git a/fs/tux3/user/super.c b/fs/tux3/user/super.c
index e34a1b4..0743551 100644
--- a/fs/tux3/user/super.c
+++ b/fs/tux3/user/super.c
@@ -15,6 +15,15 @@
 #define trace trace_off
 #endif
 
+static struct workqueue_struct *create_workqueue(char *name) {
+       static struct workqueue_struct fakework = { };
+       return &fakework;
+}
+
+static void destroy_workqueue(struct workqueue_struct *wq) { }
+
+#define INIT_WORK(work, fn)
+
 #include "../super.c"
 
 struct inode *__alloc_inode(struct super_block *sb)
diff --git a/fs/tux3/writeback.c b/fs/tux3/writeback.c
index fc20635..5c6bcf0 100644
--- a/fs/tux3/writeback.c
+++ b/fs/tux3/writeback.c
@@ -102,6 +102,22 @@ static inline unsigned tux3_dirty_flags(struct inode 
*inode, unsigned delta)
        return ret;
 }
 
+unsigned tux3_dirty_flags1(struct inode *inode, unsigned delta)
+{
+       return (tux_inode(inode)->state >> tux3_dirty_shift(delta)) & I_DIRTY;
+}
+
+static inline unsigned tux3_iattrsta_update(unsigned state, unsigned delta);
+void tux3_dirty_switch_to_prev(struct inode *inode, unsigned delta)
+{
+       struct tux3_inode *tuxnode = tux_inode(inode);
+       unsigned state = tuxnode->state;
+
+       state |= tux3_dirty_mask(tux3_dirty_flags(inode, delta) & I_DIRTY, 
delta - 1);
+       state &= ~tux3_dirty_mask(I_DIRTY, delta);
+       tuxnode->state = tux3_iattrsta_update(state, delta - 1);
+}
+
 /* This is hook of __mark_inode_dirty() and called I_DIRTY_PAGES too */
 void tux3_dirty_inode(struct inode *inode, int flags)
 {
@@ -226,6 +242,8 @@ static void tux3_clear_dirty_inode_nolock(struct inode 
*inode, unsigned delta,
        /* Update state if inode isn't dirty anymore */
        if (!(tuxnode->state & ~NON_DIRTY_FLAGS))
                inode->i_state &= ~I_DIRTY;
+
+       tux3_inode_clear_flag(TUX3_INODE_FSYNC_BIT, inode); /* ugly */
 }
 
 /* Clear dirty flags for delta */
@@ -502,12 +520,31 @@ int tux3_flush_inode(struct inode *inode, unsigned delta, 
int req_flag)
                dirty = tux3_dirty_flags(inode, delta);
 
        if (dirty & (TUX3_DIRTY_BTREE | I_DIRTY_SYNC | I_DIRTY_DATASYNC)) {
+               struct tux3_inode *tuxnode = tux_inode(inode);
+               struct sb *sb = tux_sb(inode->i_sb);
                /*
                 * If there is btree root, adjust present after
                 * tux3_flush_buffers().
                 */
                tux3_iattr_adjust_for_btree(inode, &idata);
 
+               if (fsync_mode(sb)) {
+                       if (idata.i_nlink != tuxnode->nlink_base) {
+                               /*
+                                * FIXME: we redirty inode attributes here so 
next delta
+                                * will flush correct nlinks. This means that 
an fsync
+                                * of the same inode before the next delta will 
flush
+                                * it again even it has not been changed.
+                                */
+                               tux3_iattrdirty_delta(inode, 
sb->suspended->delta);
+                               tux3_mark_inode_dirty_sync(inode);
+                               idata.i_nlink = tuxnode->nlink_base;
+                       }
+                       if (!idata.i_nlink)
+                               log_fsync_orphan(sb, sb->version, 
tuxnode->inum);
+               } else
+                       tuxnode->nlink_base = idata.i_nlink;
+
                err = tux3_save_inode(inode, &idata, delta);
                if (err && !ret)
                        ret = err;
@@ -569,10 +606,8 @@ static int inode_inum_cmp(void *priv, struct list_head *a, 
struct list_head *b)
        return 0;
 }
 
-int tux3_flush_inodes(struct sb *sb, unsigned delta)
+int tux3_flush_inodes_list(struct sb *sb, unsigned delta, struct list_head 
*dirty_inodes)
 {
-       struct sb_delta_dirty *s_ddc = tux3_sb_ddc(sb, delta);
-       struct list_head *dirty_inodes = &s_ddc->dirty_inodes;
        struct inode_delta_dirty *i_ddc, *safe;
        inum_t private;
        int err;
@@ -612,6 +647,12 @@ error:
        return err;
 }
 
+int tux3_flush_inodes(struct sb *sb, unsigned delta)
+{
+       struct sb_delta_dirty *s_ddc = tux3_sb_ddc(sb, delta);
+       return tux3_flush_inodes_list(sb, delta, &s_ddc->dirty_inodes);
+}
+
 int tux3_has_dirty_inodes(struct sb *sb, unsigned delta)
 {
        struct sb_delta_dirty *s_ddc = tux3_sb_ddc(sb, delta);
@@ -663,3 +704,4 @@ unsigned tux3_check_tuxinode_state(struct inode *inode)
 {
        return tux_inode(inode)->state & ~NON_DIRTY_FLAGS;
 }
+
diff --git a/fs/tux3/writeback_iattrfork.c b/fs/tux3/writeback_iattrfork.c
index 658c012..c50a8c2 100644
--- a/fs/tux3/writeback_iattrfork.c
+++ b/fs/tux3/writeback_iattrfork.c
@@ -54,10 +54,9 @@ static void idata_copy(struct inode *inode, struct 
tux3_iattr_data *idata)
  *
  * FIXME: this is better to call tux3_mark_inode_dirty() too?
  */
-void tux3_iattrdirty(struct inode *inode)
+void tux3_iattrdirty_delta(struct inode *inode, unsigned delta)
 {
        struct tux3_inode *tuxnode = tux_inode(inode);
-       unsigned delta = tux3_inode_delta(inode);
        unsigned state = tuxnode->state;
 
        /* If dirtied on this delta, nothing to do */
@@ -107,6 +106,11 @@ void tux3_iattrdirty(struct inode *inode)
        spin_unlock(&tuxnode->lock);
 }
 
+void tux3_iattrdirty(struct inode *inode)
+{
+       tux3_iattrdirty_delta(inode, tux3_inode_delta(inode));
+}
+
 /* Caller must hold tuxnode->lock */
 static void tux3_iattr_clear_dirty(struct tux3_inode *tuxnode)
 {
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to