On 07/25/2012 08:15 PM, levin li wrote:
> +#define list_for_each_entry_revert_safe_rcu(pos, n, head, member) \
> +     for (pos = cds_list_entry(rcu_dereference((head)->prev), \
> +                               typeof(*pos), member), \
> +          n = cds_list_entry(rcu_dereference(pos->member.prev),\
> +                             typeof(*pos), member); \
> +                             &pos->member != (head); \
> +          pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\
> +                                      typeof(*pos), member))
>  

This should be placed in list.h

>  struct global_cache {
>       uint64_t cache_size;
> @@ -50,6 +64,7 @@ struct global_cache {
>  
>  struct object_cache {
>       uint32_t vid;
> +     int flushing;

uint8_t is enough.

> +
> +static int check_cache_status(struct object_cache *oc,
> +                           struct object_cache_entry *entry)
> +{
> +     int refcnt = uatomic_read(&entry->refcnt);
> +
> +     if (cache_is_flushing(oc)) {
> +             dprintf("cache %" PRIx32 " is flushing, don't reclaim it.\n",
> +                     oc->vid);
> +             return SD_RES_CACHE_FLUSHING;
> +     }
> +
> +     /* If entry is being accessed, we don't reclaim it */
> +     if (refcnt > 0) {

uatomic_read(&entry->refcnt); should be placed in if clause.

> +             dprintf("cache object %" PRIx32 "(%08" PRIx32 ") "
> +                     "can't be reclaimed, refcnt: %d\n",
> +                     oc->vid, entry->idx, refcnt);
> +             return SD_RES_CACHE_REFERENCING;
> +     }
> +
> +     return SD_RES_SUCCESS;
> +}
> +
> +static int reclaim_object(struct object_cache_entry *entry)
> +{
> +     struct object_cache *oc = entry->oc;
> +     uint32_t idx = entry->idx;
> +     int ret = SD_RES_SUCCESS;
> +
> +     pthread_rwlock_wrlock(&oc->lock);
> +     dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n",
> +             oc->vid, idx, uatomic_read(&sys_cache.cache_size));
> +
> +     ret = check_cache_status(oc, entry);
> +     if (ret != SD_RES_SUCCESS)
> +             goto out;
> +
> +     if (entry_is_dirty(entry)) {
> +             uint64_t bmap = entry->bmap;
> +             int create = entry->flags & ENTRY_CREATE_BIT;
> +
> +             entry->bmap = 0;
> +             del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
> +             pthread_rwlock_unlock(&oc->lock);
> +
> +             ret = push_cache_object(oc->vid, idx, bmap, create);
> +
> +             pthread_rwlock_wrlock(&oc->lock);
> +             if (ret == SD_RES_SUCCESS) {
> +                     entry->flags &= ~ENTRY_CREATE_BIT;
> +                     /* dirty again */
> +                     if (entry_is_dirty(entry)) {
> +                             dprintf("object cache is dirty again %06" 
> PRIx32 "\n", idx);
> +                             ret = SD_RES_CACHE_REFERENCING;
> +                             goto out;
> +                     }
> +             } else {
> +                     /* still dirty */
> +                     entry->bmap |= bmap;
> +                     ret = SD_RES_CACHE_REFERENCING;
> +                     goto out;
> +             }
> +
> +             /* Now we get lock again, check cache status again */
> +             ret = check_cache_status(oc, entry);
> +             if (ret != SD_RES_SUCCESS)
> +                     goto out;
> +     }
> +
> +     /* Mark the entry as being reclaimed */
> +     entry_start_reclaiming(entry);

rename entry_start_reclaiming as mark_entry_dirty() instead of adding
another comment.

> +
> +     ret = remove_cache_object(oc, idx);
> +     if (ret == SD_RES_SUCCESS)
> +             del_from_object_tree_and_list(entry, &oc->object_tree);
> +out:
> +     pthread_rwlock_unlock(&oc->lock);
> +     return ret;
> +}
> +
> +static void reclaim_work(struct work *work)
> +{
> +     struct object_cache_entry *entry, *n;
> +     int ret;
> +
> +     if (node_in_recovery())
> +             return;
> +
> +     list_for_each_entry_revert_safe_rcu(entry, n,
> +                    &sys_cache.cache_lru_list, lru_list) {
> +             /* Reclaim cache to 80% of max size */
> +             if (uatomic_read(&sys_cache.cache_size) <=
> +                 sys->cache_size * 8 / 10)
> +                     break;
> +
> +             ret = reclaim_object(entry);

check if (ret == SD_RES_CACHE_FLUSHING) first, then we don't need else
clause.

> +             if (ret == SD_RES_SUCCESS) {
> +                     unsigned data_length;
> +                     if (idx_has_vdi_bit(entry->idx))
> +                             data_length = SD_INODE_SIZE / 1024;
> +                     else
> +                             data_length = SD_DATA_OBJ_SIZE / 1024;
> +

Why 1024? can't we simply cache_size - data_length?

> +                     uatomic_sub(&sys_cache.cache_size, data_length);
> +                     free(entry);
> +             } else if (ret == SD_RES_CACHE_FLUSHING)
> +                     /* If cache is flushing, stop reclaiming. */
> +                     break;
> +     }
> +
> +     dprintf("cache reclaim complete\n");
> +}
> +
> +static void reclaim_done(struct work *work)
> +{
> +     uatomic_set(&sys_cache.reclaiming, 0);
> +     free(work);
> +}
> +
>  static struct object_cache_entry *
>  dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>                 uint64_t bmap, int create)
> @@ -183,6 +406,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>               else {
>                       /* already has this entry, merge bmap */
>                       entry->bmap |= bmap;
> +                     if (create)
> +                             entry->flags |= ENTRY_CREATE_BIT;
>                       return entry;
>               }
>       }
> @@ -192,7 +417,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>               return NULL;
>  
>       entry->bmap |= bmap;
> -     entry->flags |= ENTRY_CREATE_BIT;
> +     if (create)
> +             entry->flags |= ENTRY_CREATE_BIT;
>       rb_link_node(&entry->dirty_node, parent, p);
>       rb_insert_color(&entry->dirty_node, &oc->dirty_tree);
>       list_add(&entry->list, &oc->dirty_list);
> @@ -200,26 +426,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>       return entry;
>  }
>  
> -static struct object_cache_entry *dirty_tree_search(struct rb_root *root,
> -                                                 uint32_t idx)
> -{
> -     struct rb_node *n = root->rb_node;
> -     struct object_cache_entry *t;
> -
> -     while (n) {
> -             t = rb_entry(n, struct object_cache_entry, dirty_node);
> -
> -             if (idx < t->idx)
> -                     n = n->rb_left;
> -             else if (idx > t->idx)
> -                     n = n->rb_right;
> -             else
> -                     return t; /* found it */
> -     }
> -
> -     return NULL;
> -}
> -
>  static int create_dir_for(uint32_t vid)
>  {
>       int ret = 0;
> @@ -257,6 +463,7 @@ not_found:
>       if (create) {
>               cache = xzalloc(sizeof(*cache));
>               cache->vid = vid;
> +             cache->object_tree = RB_ROOT;
>               create_dir_for(vid);
>  
>               cache->dirty_tree = RB_ROOT;
> @@ -271,14 +478,6 @@ out:
>       return cache;
>  }
>  
> -static inline void
> -del_from_dirty_tree_and_list(struct object_cache_entry *entry,
> -                          struct rb_root *dirty_tree)
> -{
> -     rb_erase(&entry->dirty_node, dirty_tree);
> -     list_del(&entry->list);
> -}
> -
>  /* Caller should hold the oc->lock */
>  static inline void
>  add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx,
> @@ -289,6 +488,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, 
> uint32_t idx,
>       if (!entry)
>               panic("Can not find object entry %" PRIx32 "\n", idx);
>  
> +     if (cache_is_reclaiming())

should be renamed as cache_in_reclaim()

> +             return;
> +
>       /* If cache isn't in reclaiming, move it
>        * to the head of lru list */
>       cds_list_del_rcu(&entry->lru_list);
> @@ -321,6 +523,9 @@ static void add_to_object_cache(struct object_cache *oc, 
> uint32_t idx)
>       entry->idx = idx;
>       CDS_INIT_LIST_HEAD(&entry->lru_list);
>  
> +     dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n",
> +             oc->vid, idx);
> +
>       pthread_rwlock_wrlock(&oc->lock);
>       old = object_cache_insert(&oc->object_tree, entry);
>       if (!old) {
> @@ -331,20 +536,55 @@ static void add_to_object_cache(struct object_cache 
> *oc, uint32_t idx)
>               entry = old;
>       }
>       pthread_rwlock_unlock(&oc->lock);
> +
> +     dprintf("sys_cache.cache_size %" PRIx64 ", sys->cache_size %" PRIx64 
> "\n",
> +             uatomic_read(&sys_cache.cache_size), sys->cache_size);
> +     if (sys->cache_size &&
> +         uatomic_read(&sys_cache.cache_size) > sys->cache_size &&
> +         !cache_is_reclaiming()) {
> +             struct work *work = xzalloc(sizeof(struct work));
> +             uatomic_set(&sys_cache.reclaiming, 1);
> +             work->fn = reclaim_work;
> +             work->done = reclaim_done;
> +             queue_work(sys->reclaim_wqueue, work);
> +     }
> +}
> +
> +static inline int cache_sanity_check(struct object_cache *oc, uint32_t idx,
> +                                  struct object_cache_entry **entry)
> +{
> +     struct object_cache_entry *ent;
> +
> +     ent = object_tree_search(&oc->object_tree, idx);
> +
> +     if (!ent || entry_is_reclaiming(ent))
> +             return SD_RES_NO_CACHE;
> +
> +     if (entry)
> +             *entry = ent;
> +
> +     return SD_RES_SUCCESS;
>  }

better renamed as get_cache_entry() which inc refcnt inside if
successful and add another helper find_cache_entry() to find the entry,
put_cache_entry() to dec the refcnt (other uninit work if any).

        get_cache_entry() {
                entry = find_cache_entry();
                ...
                inc refcnt..
                ...
        }

Then find_cache_entry can be used by object_cache_lookup() too.

>  
>  static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
>                              int create)
>  {
>       struct strbuf buf;
> -     int fd, ret = 0, flags = def_open_flags;
> +     int fd, ret = SD_RES_SUCCESS, flags = def_open_flags;
> +     unsigned data_length;
> +
> +     if (!create) {
> +             pthread_rwlock_wrlock(&oc->lock);
> +             ret = cache_sanity_check(oc, idx, NULL);
> +             pthread_rwlock_unlock(&oc->lock);
> +             return ret;
> +     }
>  
>       strbuf_init(&buf, PATH_MAX);
>       strbuf_addstr(&buf, cache_dir);
>       strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
>  
> -     if (create)
> -             flags |= O_CREAT | O_TRUNC;
> +     flags |= O_CREAT | O_TRUNC;
>  
>       fd = open(buf.buf, flags, def_fmode);
>       if (fd < 0) {
> @@ -352,26 +592,22 @@ static int object_cache_lookup(struct object_cache *oc, 
> uint32_t idx,
>               goto out;
>       }
>  
> -     if (create) {
> -             unsigned data_length;
> -
> -             if (idx_has_vdi_bit(idx))
> -                     data_length = SD_INODE_SIZE;
> -             else
> -                     data_length = SD_DATA_OBJ_SIZE;
> +     if (idx_has_vdi_bit(idx))
> +             data_length = SD_INODE_SIZE;
> +     else
> +             data_length = SD_DATA_OBJ_SIZE;
>  
> -             ret = prealloc(fd, data_length);
> -             if (ret != SD_RES_SUCCESS)
> -                     ret = -1;
> -             else {
> -                     uint64_t bmap = UINT64_MAX;
> +     ret = prealloc(fd, data_length);
> +     if (ret != SD_RES_SUCCESS)
> +             ret = -1;
> +     else {
> +             uint64_t bmap = UINT64_MAX;
>  
> -                     add_to_object_cache(oc, idx);
> +             add_to_object_cache(oc, idx);
>  
> -                     pthread_rwlock_wrlock(&oc->lock);
> -                     add_to_dirty_tree_and_list(oc, idx, bmap, 1);
> -                     pthread_rwlock_unlock(&oc->lock);
> -             }
> +             pthread_rwlock_wrlock(&oc->lock);
> +             add_to_dirty_tree_and_list(oc, idx, bmap, 1);
> +             pthread_rwlock_unlock(&oc->lock);
>       }
>       close(fd);
>  out:
> @@ -651,20 +887,23 @@ static int object_cache_push(struct object_cache *oc)
>               pthread_rwlock_rdlock(&oc->lock);
>               bmap = entry->bmap;
>               create = entry->flags & ENTRY_CREATE_BIT;
> +             entry->bmap = 0;
>               pthread_rwlock_unlock(&oc->lock);
>  
>               ret = push_cache_object(oc->vid, entry->idx, bmap, create);
> -             if (ret != SD_RES_SUCCESS)
> -                     goto push_failed;
>  
>               pthread_rwlock_wrlock(&oc->lock);
> +             if (ret != SD_RES_SUCCESS) {
> +                     entry->bmap |= bmap;
> +                     goto push_failed;
> +             }
> +             entry->flags &= ~ENTRY_CREATE_BIT;
>               del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
>               pthread_rwlock_unlock(&oc->lock);
>       }
>       return ret;
>  
>  push_failed:
> -     pthread_rwlock_wrlock(&oc->lock);
>       list_splice_init(&inactive_dirty_list, &oc->dirty_list);
>       pthread_rwlock_unlock(&oc->lock);
>  
> @@ -681,10 +920,10 @@ int object_is_cached(uint64_t oid)
>       if (!cache)
>               return 0;
>  
> -     if (object_cache_lookup(cache, idx, 0) < 0)
> -             return 0;
> +     if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS)
> +             return 1;
>       else
> -             return 1; /* found it */
> +             return 0;
>  }
>  
>  void object_cache_delete(uint32_t vid)
> @@ -716,6 +955,52 @@ void object_cache_delete(uint32_t vid)
>  
>  }
>  
> +static void object_cache_flush_begin(struct object_cache *oc)
> +{
> +     pthread_rwlock_wrlock(&oc->lock);
> +     oc->flushing = 1;
> +     pthread_rwlock_unlock(&oc->lock);
> +}
> +
> +static void object_cache_flush_end(struct object_cache *oc)
> +{
> +     pthread_rwlock_wrlock(&oc->lock);
> +     oc->flushing = 0;
> +     pthread_rwlock_unlock(&oc->lock);
> +}

Why can't flushing be atomic? Better fold these two helpers into callers.

> +
> +static int object_cache_access_begin(struct object_cache *cache, uint32_t 
> idx,
> +                                  struct object_cache_entry **entry)
> +{
> +     struct object_cache_entry *ent;
> +     int ret = SD_RES_SUCCESS;
> +
> +     if (entry)
> +             *entry = NULL;
> +
> +     pthread_rwlock_rdlock(&cache->lock);
> +     ret = cache_sanity_check(cache, idx, &ent);
> +     if (ret != SD_RES_SUCCESS) {
> +             /* The cache entry may be reclaimed, so try again. */
> +             pthread_rwlock_unlock(&cache->lock);
> +             dprintf("cache sanity error %d\n", ret);
> +             return ret;
> +     }
> +
> +     uatomic_inc(&ent->refcnt);
> +     pthread_rwlock_unlock(&cache->lock);
> +
> +     if (entry)
> +             *entry = ent;
> +
> +     return ret;
> +}
> +

It is cleaner to call access_begin/end() in read/write_cache_object().

> +static void object_cache_access_end(struct object_cache_entry *entry)
> +{
> +     uatomic_dec(&entry->refcnt);
> +}
> +
>  static int object_cache_flush_and_delete(struct object_cache *oc)
>  {
>       DIR *dir;
> @@ -726,6 +1011,8 @@ static int object_cache_flush_and_delete(struct 
> object_cache *oc)
>       struct strbuf p;
>       int ret = 0;
>  
> +     object_cache_flush_begin(oc);
> +
>       strbuf_init(&p, PATH_MAX);
>       strbuf_addstr(&p, cache_dir);
>       strbuf_addf(&p, "/%06"PRIx32, vid);
> @@ -754,6 +1041,8 @@ static int object_cache_flush_and_delete(struct 
> object_cache *oc)
>       }
>  
>       object_cache_delete(vid);
> +
> +     object_cache_flush_end(oc);
>  out:
>       strbuf_release(&p);
>       return ret;
> @@ -777,10 +1066,10 @@ int bypass_object_cache(struct request *req)
>                       /* For read requet, we can read cache if any */
>                       uint32_t idx = object_cache_oid_to_idx(oid);
>  
> -                     if (object_cache_lookup(cache, idx, 0) < 0)
> -                             return 1;
> -                     else
> +                     if (object_cache_lookup(cache, idx, 0) == 0)
>                               return 0;
> +                     else
> +                             return 1;
>               }
>       }
>  
> @@ -811,37 +1100,42 @@ int object_cache_handle_request(struct request *req)
>       if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
>               create = 1;
>  
> -     if (object_cache_lookup(cache, idx, create) < 0) {
> +retry:
> +     ret = object_cache_lookup(cache, idx, create);
> +     if (ret == SD_RES_NO_CACHE) {
>               ret = object_cache_pull(cache, idx);
>               if (ret != SD_RES_SUCCESS)
>                       return ret;
>       }
>  
> -     pthread_rwlock_rdlock(&cache->lock);
> -     entry = object_tree_search(&cache->object_tree, idx);
> -     pthread_rwlock_unlock(&cache->lock);
> +     ret = object_cache_access_begin(cache, idx, &entry);
> +     if (ret != SD_RES_SUCCESS) {
> +             dprintf("retrying... %d\n", ret);
> +             goto retry;
> +     }
>  
>       if (hdr->flags & SD_FLAG_CMD_WRITE) {
>               ret = write_cache_object(cache->vid, idx, req->data,
>                                        hdr->data_length, hdr->obj.offset);
>               if (ret != SD_RES_SUCCESS)
> -                     goto out;
> +                     goto err;
>               update_cache_entry(cache, idx, hdr->data_length,
>                                  hdr->obj.offset);
>       } else {
>               ret = read_cache_object(cache->vid, idx, req->data,
>                                       hdr->data_length, hdr->obj.offset);
>               if (ret != SD_RES_SUCCESS)
> -                     goto out;
> +                     goto err;
>               req->rp.data_length = hdr->data_length;
>  
> -             if (entry) {
> +             if (entry && !cache_is_reclaiming()) {
>                       cds_list_del_rcu(&entry->lru_list);
>                       cds_list_add_rcu(&entry->lru_list,
>                                        &sys_cache.cache_lru_list);
>               }
>       }
> -out:
> +err:
> +     object_cache_access_end(entry);
>       return ret;
>  }
>  
> @@ -851,13 +1145,23 @@ int object_cache_write(uint64_t oid, char *data, 
> unsigned int datalen,
>       uint32_t vid = oid_to_vid(oid);
>       uint32_t idx = object_cache_oid_to_idx(oid);
>       struct object_cache *cache;
> +     struct object_cache_entry *entry;
>       int ret;
>  
>       cache = find_object_cache(vid, 0);
>  
> +     dprintf("cache object write %" PRIx32 "\n", idx);
> +
> +     ret = object_cache_access_begin(cache, idx, &entry);
> +     if (ret != SD_RES_SUCCESS)
> +             panic("cache object %" PRIx32 " doesn't exist\n", idx);
> +
>       ret = write_cache_object(vid, idx, data, datalen, offset);
>       if (ret == SD_RES_SUCCESS)
>               update_cache_entry(cache, idx, datalen, offset);
> +
> +     object_cache_access_end(entry);
> +
>       return ret;
>  }
>  
> @@ -866,20 +1170,40 @@ int object_cache_read(uint64_t oid, char *data, 
> unsigned int datalen,
>  {
>       uint32_t vid = oid_to_vid(oid);
>       uint32_t idx = object_cache_oid_to_idx(oid);
> +     struct object_cache *cache;
> +     struct object_cache_entry *entry;
> +     int ret;
> +
> +     cache = find_object_cache(vid, 0);
> +
> +     dprintf("cache object read %" PRIx32 "\n", idx);
> +
> +     ret = object_cache_access_begin(cache, idx, &entry);
> +     if (ret != SD_RES_SUCCESS)
> +             panic("cache object %" PRIx32 " doesn't exist\n", idx);
>  
> -     return read_cache_object(vid, idx, data, datalen, offset);
> +     ret = read_cache_object(vid, idx, data, datalen, offset);
> +
> +     object_cache_access_end(entry);
> +
> +     return ret;
>  }
>  
>  int object_cache_flush_vdi(struct request *req)
>  {
>       uint32_t vid = oid_to_vid(req->rq.obj.oid);
>       struct object_cache *cache;
> +     int ret;
>  
>       cache = find_object_cache(vid, 0);
>       if (!cache)
>               return SD_RES_SUCCESS;
>  
> -     return object_cache_push(cache);
> +     object_cache_flush_begin(cache);
> +     ret = object_cache_push(cache);
> +     object_cache_flush_end(cache);
> +
> +     return ret;
>  }
>  
>  int object_cache_flush_and_del(struct request *req)
> @@ -888,8 +1212,10 @@ int object_cache_flush_and_del(struct request *req)
>       struct object_cache *cache;
>  
>       cache = find_object_cache(vid, 0);
> +
>       if (cache && object_cache_flush_and_delete(cache) < 0)
>               return SD_RES_EIO;
> +
>       return SD_RES_SUCCESS;
>  }
>  
> @@ -1003,6 +1329,7 @@ int object_cache_init(const char *p)
>  
>       CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list);
>       uatomic_set(&sys_cache.cache_size, 0);
> +     uatomic_set(&sys_cache.reclaiming, 0);
>  
>       ret = load_existing_cache();
>  err:
> diff --git a/sheep/sheep.c b/sheep/sheep.c
> index 8b78669..29270e9 100644
> --- a/sheep/sheep.c
> +++ b/sheep/sheep.c
> @@ -359,8 +359,9 @@ int main(int argc, char **argv)
>       sys->deletion_wqueue = init_work_queue("deletion", true);
>       sys->block_wqueue = init_work_queue("block", true);
>       sys->sockfd_wqueue = init_work_queue("sockfd", true);
> +     sys->reclaim_wqueue = init_work_queue("reclaim", true);
>       if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
> -         !sys->deletion_wqueue || !sys->block_wqueue)
> +         !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue)
>               exit(1);
>  
>       ret = trace_init();
> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
> index fe61411..2090d67 100644
> --- a/sheep/sheep_priv.h
> +++ b/sheep/sheep_priv.h
> @@ -125,6 +125,7 @@ struct cluster_info {
>       struct work_queue *recovery_wqueue;
>       struct work_queue *block_wqueue;
>       struct work_queue *sockfd_wqueue;
> +     struct work_queue *reclaim_wqueue;
>  };
>  
>  struct siocb {
> diff --git a/sheep/store.c b/sheep/store.c
> index a05822d..4a65c9e 100644
> --- a/sheep/store.c
> +++ b/sheep/store.c
> @@ -492,9 +492,13 @@ int write_object(uint64_t oid, char *data, unsigned int 
> datalen,
>       struct sd_req hdr;
>       int ret;
>  
> +retry:
>       if (sys->enable_write_cache && object_is_cached(oid)) {
>               ret = object_cache_write(oid, data, datalen, offset,
>                                        flags, create);
> +             if (ret == SD_RES_NO_CACHE)
> +                     goto retry;
> +
>               if (ret != 0) {
>                       eprintf("write cache failed %"PRIx64" %"PRIx32"\n",
>                               oid, ret);
> @@ -529,8 +533,12 @@ int read_object(uint64_t oid, char *data, unsigned int 
> datalen,
>       struct sd_req hdr;
>       int ret;
>  
> +retry:
>       if (sys->enable_write_cache && object_is_cached(oid)) {
>               ret = object_cache_read(oid, data, datalen, offset);
> +             if (ret == SD_RES_NO_CACHE)
> +                     goto retry;
> +
>               if (ret != SD_RES_SUCCESS) {
>                       eprintf("try forward read %"PRIx64" %"PRIx32"\n",
>                               oid, ret);
> 

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to