On 2012年07月26日 11:27, Liu Yuan wrote:
> On 07/25/2012 08:15 PM, levin li wrote:
>> +#define list_for_each_entry_revert_safe_rcu(pos, n, head, member) \
>> +    for (pos = cds_list_entry(rcu_dereference((head)->prev), \
>> +                              typeof(*pos), member), \
>> +         n = cds_list_entry(rcu_dereference(pos->member.prev),\
>> +                            typeof(*pos), member); \
>> +                            &pos->member != (head); \
>> +         pos = n, n = cds_list_entry(rcu_dereference(pos->member.prev),\
>> +                                     typeof(*pos), member))
>>  
> 
> This should be placed in list.h
> 
>>  struct global_cache {
>>      uint64_t cache_size;
>> @@ -50,6 +64,7 @@ struct global_cache {
>>  
>>  struct object_cache {
>>      uint32_t vid;
>> +    int flushing;
> 
> uint8_t is enough.
> 
>> +
>> +static int check_cache_status(struct object_cache *oc,
>> +                          struct object_cache_entry *entry)
>> +{
>> +    int refcnt = uatomic_read(&entry->refcnt);
>> +
>> +    if (cache_is_flushing(oc)) {
>> +            dprintf("cache %" PRIx32 " is flushing, don't reclaim it.\n",
>> +                    oc->vid);
>> +            return SD_RES_CACHE_FLUSHING;
>> +    }
>> +
>> +    /* If entry is being accessed, we don't reclaim it */
>> +    if (refcnt > 0) {
> 
> uatomic_read(&entry->refcnt); should be placed in if clause.
> 
>> +            dprintf("cache object %" PRIx32 "(%08" PRIx32 ") "
>> +                    "can't be reclaimed, refcnt: %d\n",
>> +                    oc->vid, entry->idx, refcnt);
>> +            return SD_RES_CACHE_REFERENCING;
>> +    }
>> +
>> +    return SD_RES_SUCCESS;
>> +}
>> +
>> +static int reclaim_object(struct object_cache_entry *entry)
>> +{
>> +    struct object_cache *oc = entry->oc;
>> +    uint32_t idx = entry->idx;
>> +    int ret = SD_RES_SUCCESS;
>> +
>> +    pthread_rwlock_wrlock(&oc->lock);
>> +    dprintf("reclaiming /%06"PRIx32"/%08"PRIx32", cache_size: %ld\n",
>> +            oc->vid, idx, uatomic_read(&sys_cache.cache_size));
>> +
>> +    ret = check_cache_status(oc, entry);
>> +    if (ret != SD_RES_SUCCESS)
>> +            goto out;
>> +
>> +    if (entry_is_dirty(entry)) {
>> +            uint64_t bmap = entry->bmap;
>> +            int create = entry->flags & ENTRY_CREATE_BIT;
>> +
>> +            entry->bmap = 0;
>> +            del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
>> +            pthread_rwlock_unlock(&oc->lock);
>> +
>> +            ret = push_cache_object(oc->vid, idx, bmap, create);
>> +
>> +            pthread_rwlock_wrlock(&oc->lock);
>> +            if (ret == SD_RES_SUCCESS) {
>> +                    entry->flags &= ~ENTRY_CREATE_BIT;
>> +                    /* dirty again */
>> +                    if (entry_is_dirty(entry)) {
>> +                            dprintf("object cache is dirty again %06" 
>> PRIx32 "\n", idx);
>> +                            ret = SD_RES_CACHE_REFERENCING;
>> +                            goto out;
>> +                    }
>> +            } else {
>> +                    /* still dirty */
>> +                    entry->bmap |= bmap;
>> +                    ret = SD_RES_CACHE_REFERENCING;
>> +                    goto out;
>> +            }
>> +
>> +            /* Now we get lock again, check cache status again */
>> +            ret = check_cache_status(oc, entry);
>> +            if (ret != SD_RES_SUCCESS)
>> +                    goto out;
>> +    }
>> +
>> +    /* Mark the entry as being reclaimed */
>> +    entry_start_reclaiming(entry);
> 
> rename entry_start_reclaiming as mark_entry_dirty() instead of adding
> another comment.
> 
>> +
>> +    ret = remove_cache_object(oc, idx);
>> +    if (ret == SD_RES_SUCCESS)
>> +            del_from_object_tree_and_list(entry, &oc->object_tree);
>> +out:
>> +    pthread_rwlock_unlock(&oc->lock);
>> +    return ret;
>> +}
>> +
>> +static void reclaim_work(struct work *work)
>> +{
>> +    struct object_cache_entry *entry, *n;
>> +    int ret;
>> +
>> +    if (node_in_recovery())
>> +            return;
>> +
>> +    list_for_each_entry_revert_safe_rcu(entry, n,
>> +                   &sys_cache.cache_lru_list, lru_list) {
>> +            /* Reclaim cache to 80% of max size */
>> +            if (uatomic_read(&sys_cache.cache_size) <=
>> +                sys->cache_size * 8 / 10)
>> +                    break;
>> +
>> +            ret = reclaim_object(entry);
> 
> check if (ret == SD_RES_CACHE_FLUSHING) first, then we don't need else
> clause.
> 
>> +            if (ret == SD_RES_SUCCESS) {
>> +                    unsigned data_length;
>> +                    if (idx_has_vdi_bit(entry->idx))
>> +                            data_length = SD_INODE_SIZE / 1024;
>> +                    else
>> +                            data_length = SD_DATA_OBJ_SIZE / 1024;
>> +
> 
> Why 1024? can't we simply cache_size - data_length?
> 
>> +                    uatomic_sub(&sys_cache.cache_size, data_length);
>> +                    free(entry);
>> +            } else if (ret == SD_RES_CACHE_FLUSHING)
>> +                    /* If cache is flushing, stop reclaiming. */
>> +                    break;
>> +    }
>> +
>> +    dprintf("cache reclaim complete\n");
>> +}
>> +
>> +static void reclaim_done(struct work *work)
>> +{
>> +    uatomic_set(&sys_cache.reclaiming, 0);
>> +    free(work);
>> +}
>> +
>>  static struct object_cache_entry *
>>  dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>>                uint64_t bmap, int create)
>> @@ -183,6 +406,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>>              else {
>>                      /* already has this entry, merge bmap */
>>                      entry->bmap |= bmap;
>> +                    if (create)
>> +                            entry->flags |= ENTRY_CREATE_BIT;
>>                      return entry;
>>              }
>>      }
>> @@ -192,7 +417,8 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>>              return NULL;
>>  
>>      entry->bmap |= bmap;
>> -    entry->flags |= ENTRY_CREATE_BIT;
>> +    if (create)
>> +            entry->flags |= ENTRY_CREATE_BIT;
>>      rb_link_node(&entry->dirty_node, parent, p);
>>      rb_insert_color(&entry->dirty_node, &oc->dirty_tree);
>>      list_add(&entry->list, &oc->dirty_list);
>> @@ -200,26 +426,6 @@ dirty_tree_insert(struct object_cache *oc, uint32_t idx,
>>      return entry;
>>  }
>>  
>> -static struct object_cache_entry *dirty_tree_search(struct rb_root *root,
>> -                                                uint32_t idx)
>> -{
>> -    struct rb_node *n = root->rb_node;
>> -    struct object_cache_entry *t;
>> -
>> -    while (n) {
>> -            t = rb_entry(n, struct object_cache_entry, dirty_node);
>> -
>> -            if (idx < t->idx)
>> -                    n = n->rb_left;
>> -            else if (idx > t->idx)
>> -                    n = n->rb_right;
>> -            else
>> -                    return t; /* found it */
>> -    }
>> -
>> -    return NULL;
>> -}
>> -
>>  static int create_dir_for(uint32_t vid)
>>  {
>>      int ret = 0;
>> @@ -257,6 +463,7 @@ not_found:
>>      if (create) {
>>              cache = xzalloc(sizeof(*cache));
>>              cache->vid = vid;
>> +            cache->object_tree = RB_ROOT;
>>              create_dir_for(vid);
>>  
>>              cache->dirty_tree = RB_ROOT;
>> @@ -271,14 +478,6 @@ out:
>>      return cache;
>>  }
>>  
>> -static inline void
>> -del_from_dirty_tree_and_list(struct object_cache_entry *entry,
>> -                         struct rb_root *dirty_tree)
>> -{
>> -    rb_erase(&entry->dirty_node, dirty_tree);
>> -    list_del(&entry->list);
>> -}
>> -
>>  /* Caller should hold the oc->lock */
>>  static inline void
>>  add_to_dirty_tree_and_list(struct object_cache *oc, uint32_t idx,
>> @@ -289,6 +488,9 @@ add_to_dirty_tree_and_list(struct object_cache *oc, 
>> uint32_t idx,
>>      if (!entry)
>>              panic("Can not find object entry %" PRIx32 "\n", idx);
>>  
>> +    if (cache_is_reclaiming())
> 
> should be renamed as cache_in_reclaim()
> 
>> +            return;
>> +
>>      /* If cache isn't in reclaiming, move it
>>       * to the head of lru list */
>>      cds_list_del_rcu(&entry->lru_list);
>> @@ -321,6 +523,9 @@ static void add_to_object_cache(struct object_cache *oc, 
>> uint32_t idx)
>>      entry->idx = idx;
>>      CDS_INIT_LIST_HEAD(&entry->lru_list);
>>  
>> +    dprintf("cache object for vdi %" PRIx32 ", idx %08" PRIx32 "added\n",
>> +            oc->vid, idx);
>> +
>>      pthread_rwlock_wrlock(&oc->lock);
>>      old = object_cache_insert(&oc->object_tree, entry);
>>      if (!old) {
>> @@ -331,20 +536,55 @@ static void add_to_object_cache(struct object_cache 
>> *oc, uint32_t idx)
>>              entry = old;
>>      }
>>      pthread_rwlock_unlock(&oc->lock);
>> +
>> +    dprintf("sys_cache.cache_size %" PRIx64 ", sys->cache_size %" PRIx64 
>> "\n",
>> +            uatomic_read(&sys_cache.cache_size), sys->cache_size);
>> +    if (sys->cache_size &&
>> +        uatomic_read(&sys_cache.cache_size) > sys->cache_size &&
>> +        !cache_is_reclaiming()) {
>> +            struct work *work = xzalloc(sizeof(struct work));
>> +            uatomic_set(&sys_cache.reclaiming, 1);
>> +            work->fn = reclaim_work;
>> +            work->done = reclaim_done;
>> +            queue_work(sys->reclaim_wqueue, work);
>> +    }
>> +}
>> +
>> +static inline int cache_sanity_check(struct object_cache *oc, uint32_t idx,
>> +                                 struct object_cache_entry **entry)
>> +{
>> +    struct object_cache_entry *ent;
>> +
>> +    ent = object_tree_search(&oc->object_tree, idx);
>> +
>> +    if (!ent || entry_is_reclaiming(ent))
>> +            return SD_RES_NO_CACHE;
>> +
>> +    if (entry)
>> +            *entry = ent;
>> +
>> +    return SD_RES_SUCCESS;
>>  }
> 
> better renamed as get_cache_entry() which inc refcnt inside if
> successful and add another helper find_cache_entry() to find the entry,
> put_cache_entry() to dec the refcnt (other uninit work if any).
> 
>       get_cache_entry() {
>               entry = find_cache_entry();
>               ...
>               inc refcnt..
>               ...
>       }
> 
> Then find_cache_entry can be used by object_cache_lookup() too.
> 

You mean rename object_cache_access_begin/end to get{put}_cache_entry?

>>  
>>  static int object_cache_lookup(struct object_cache *oc, uint32_t idx,
>>                             int create)
>>  {
>>      struct strbuf buf;
>> -    int fd, ret = 0, flags = def_open_flags;
>> +    int fd, ret = SD_RES_SUCCESS, flags = def_open_flags;
>> +    unsigned data_length;
>> +
>> +    if (!create) {
>> +            pthread_rwlock_wrlock(&oc->lock);
>> +            ret = cache_sanity_check(oc, idx, NULL);
>> +            pthread_rwlock_unlock(&oc->lock);
>> +            return ret;
>> +    }
>>  
>>      strbuf_init(&buf, PATH_MAX);
>>      strbuf_addstr(&buf, cache_dir);
>>      strbuf_addf(&buf, "/%06"PRIx32"/%08"PRIx32, oc->vid, idx);
>>  
>> -    if (create)
>> -            flags |= O_CREAT | O_TRUNC;
>> +    flags |= O_CREAT | O_TRUNC;
>>  
>>      fd = open(buf.buf, flags, def_fmode);
>>      if (fd < 0) {
>> @@ -352,26 +592,22 @@ static int object_cache_lookup(struct object_cache 
>> *oc, uint32_t idx,
>>              goto out;
>>      }
>>  
>> -    if (create) {
>> -            unsigned data_length;
>> -
>> -            if (idx_has_vdi_bit(idx))
>> -                    data_length = SD_INODE_SIZE;
>> -            else
>> -                    data_length = SD_DATA_OBJ_SIZE;
>> +    if (idx_has_vdi_bit(idx))
>> +            data_length = SD_INODE_SIZE;
>> +    else
>> +            data_length = SD_DATA_OBJ_SIZE;
>>  
>> -            ret = prealloc(fd, data_length);
>> -            if (ret != SD_RES_SUCCESS)
>> -                    ret = -1;
>> -            else {
>> -                    uint64_t bmap = UINT64_MAX;
>> +    ret = prealloc(fd, data_length);
>> +    if (ret != SD_RES_SUCCESS)
>> +            ret = -1;
>> +    else {
>> +            uint64_t bmap = UINT64_MAX;
>>  
>> -                    add_to_object_cache(oc, idx);
>> +            add_to_object_cache(oc, idx);
>>  
>> -                    pthread_rwlock_wrlock(&oc->lock);
>> -                    add_to_dirty_tree_and_list(oc, idx, bmap, 1);
>> -                    pthread_rwlock_unlock(&oc->lock);
>> -            }
>> +            pthread_rwlock_wrlock(&oc->lock);
>> +            add_to_dirty_tree_and_list(oc, idx, bmap, 1);
>> +            pthread_rwlock_unlock(&oc->lock);
>>      }
>>      close(fd);
>>  out:
>> @@ -651,20 +887,23 @@ static int object_cache_push(struct object_cache *oc)
>>              pthread_rwlock_rdlock(&oc->lock);
>>              bmap = entry->bmap;
>>              create = entry->flags & ENTRY_CREATE_BIT;
>> +            entry->bmap = 0;
>>              pthread_rwlock_unlock(&oc->lock);
>>  
>>              ret = push_cache_object(oc->vid, entry->idx, bmap, create);
>> -            if (ret != SD_RES_SUCCESS)
>> -                    goto push_failed;
>>  
>>              pthread_rwlock_wrlock(&oc->lock);
>> +            if (ret != SD_RES_SUCCESS) {
>> +                    entry->bmap |= bmap;
>> +                    goto push_failed;
>> +            }
>> +            entry->flags &= ~ENTRY_CREATE_BIT;
>>              del_from_dirty_tree_and_list(entry, &oc->dirty_tree);
>>              pthread_rwlock_unlock(&oc->lock);
>>      }
>>      return ret;
>>  
>>  push_failed:
>> -    pthread_rwlock_wrlock(&oc->lock);
>>      list_splice_init(&inactive_dirty_list, &oc->dirty_list);
>>      pthread_rwlock_unlock(&oc->lock);
>>  
>> @@ -681,10 +920,10 @@ int object_is_cached(uint64_t oid)
>>      if (!cache)
>>              return 0;
>>  
>> -    if (object_cache_lookup(cache, idx, 0) < 0)
>> -            return 0;
>> +    if (object_cache_lookup(cache, idx, 0) == SD_RES_SUCCESS)
>> +            return 1;
>>      else
>> -            return 1; /* found it */
>> +            return 0;
>>  }
>>  
>>  void object_cache_delete(uint32_t vid)
>> @@ -716,6 +955,52 @@ void object_cache_delete(uint32_t vid)
>>  
>>  }
>>  
>> +static void object_cache_flush_begin(struct object_cache *oc)
>> +{
>> +    pthread_rwlock_wrlock(&oc->lock);
>> +    oc->flushing = 1;
>> +    pthread_rwlock_unlock(&oc->lock);
>> +}
>> +
>> +static void object_cache_flush_end(struct object_cache *oc)
>> +{
>> +    pthread_rwlock_wrlock(&oc->lock);
>> +    oc->flushing = 0;
>> +    pthread_rwlock_unlock(&oc->lock);
>> +}
> 
> Why can't flushing be atomic? Better fold these two helpers into callers.
> 
>> +
>> +static int object_cache_access_begin(struct object_cache *cache, uint32_t 
>> idx,
>> +                                 struct object_cache_entry **entry)
>> +{
>> +    struct object_cache_entry *ent;
>> +    int ret = SD_RES_SUCCESS;
>> +
>> +    if (entry)
>> +            *entry = NULL;
>> +
>> +    pthread_rwlock_rdlock(&cache->lock);
>> +    ret = cache_sanity_check(cache, idx, &ent);
>> +    if (ret != SD_RES_SUCCESS) {
>> +            /* The cache entry may be reclaimed, so try again. */
>> +            pthread_rwlock_unlock(&cache->lock);
>> +            dprintf("cache sanity error %d\n", ret);
>> +            return ret;
>> +    }
>> +
>> +    uatomic_inc(&ent->refcnt);
>> +    pthread_rwlock_unlock(&cache->lock);
>> +
>> +    if (entry)
>> +            *entry = ent;
>> +
>> +    return ret;
>> +}
>> +
> 
> It is cleaner to call access_begin/end() in read/write_cache_object().
> 
>> +static void object_cache_access_end(struct object_cache_entry *entry)
>> +{
>> +    uatomic_dec(&entry->refcnt);
>> +}
>> +
>>  static int object_cache_flush_and_delete(struct object_cache *oc)
>>  {
>>      DIR *dir;
>> @@ -726,6 +1011,8 @@ static int object_cache_flush_and_delete(struct 
>> object_cache *oc)
>>      struct strbuf p;
>>      int ret = 0;
>>  
>> +    object_cache_flush_begin(oc);
>> +
>>      strbuf_init(&p, PATH_MAX);
>>      strbuf_addstr(&p, cache_dir);
>>      strbuf_addf(&p, "/%06"PRIx32, vid);
>> @@ -754,6 +1041,8 @@ static int object_cache_flush_and_delete(struct 
>> object_cache *oc)
>>      }
>>  
>>      object_cache_delete(vid);
>> +
>> +    object_cache_flush_end(oc);
>>  out:
>>      strbuf_release(&p);
>>      return ret;
>> @@ -777,10 +1066,10 @@ int bypass_object_cache(struct request *req)
>>                      /* For read requet, we can read cache if any */
>>                      uint32_t idx = object_cache_oid_to_idx(oid);
>>  
>> -                    if (object_cache_lookup(cache, idx, 0) < 0)
>> -                            return 1;
>> -                    else
>> +                    if (object_cache_lookup(cache, idx, 0) == 0)
>>                              return 0;
>> +                    else
>> +                            return 1;
>>              }
>>      }
>>  
>> @@ -811,37 +1100,42 @@ int object_cache_handle_request(struct request *req)
>>      if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
>>              create = 1;
>>  
>> -    if (object_cache_lookup(cache, idx, create) < 0) {
>> +retry:
>> +    ret = object_cache_lookup(cache, idx, create);
>> +    if (ret == SD_RES_NO_CACHE) {
>>              ret = object_cache_pull(cache, idx);
>>              if (ret != SD_RES_SUCCESS)
>>                      return ret;
>>      }
>>  
>> -    pthread_rwlock_rdlock(&cache->lock);
>> -    entry = object_tree_search(&cache->object_tree, idx);
>> -    pthread_rwlock_unlock(&cache->lock);
>> +    ret = object_cache_access_begin(cache, idx, &entry);
>> +    if (ret != SD_RES_SUCCESS) {
>> +            dprintf("retrying... %d\n", ret);
>> +            goto retry;
>> +    }
>>  
>>      if (hdr->flags & SD_FLAG_CMD_WRITE) {
>>              ret = write_cache_object(cache->vid, idx, req->data,
>>                                       hdr->data_length, hdr->obj.offset);
>>              if (ret != SD_RES_SUCCESS)
>> -                    goto out;
>> +                    goto err;
>>              update_cache_entry(cache, idx, hdr->data_length,
>>                                 hdr->obj.offset);
>>      } else {
>>              ret = read_cache_object(cache->vid, idx, req->data,
>>                                      hdr->data_length, hdr->obj.offset);
>>              if (ret != SD_RES_SUCCESS)
>> -                    goto out;
>> +                    goto err;
>>              req->rp.data_length = hdr->data_length;
>>  
>> -            if (entry) {
>> +            if (entry && !cache_is_reclaiming()) {
>>                      cds_list_del_rcu(&entry->lru_list);
>>                      cds_list_add_rcu(&entry->lru_list,
>>                                       &sys_cache.cache_lru_list);
>>              }
>>      }
>> -out:
>> +err:
>> +    object_cache_access_end(entry);
>>      return ret;
>>  }
>>  
>> @@ -851,13 +1145,23 @@ int object_cache_write(uint64_t oid, char *data, 
>> unsigned int datalen,
>>      uint32_t vid = oid_to_vid(oid);
>>      uint32_t idx = object_cache_oid_to_idx(oid);
>>      struct object_cache *cache;
>> +    struct object_cache_entry *entry;
>>      int ret;
>>  
>>      cache = find_object_cache(vid, 0);
>>  
>> +    dprintf("cache object write %" PRIx32 "\n", idx);
>> +
>> +    ret = object_cache_access_begin(cache, idx, &entry);
>> +    if (ret != SD_RES_SUCCESS)
>> +            panic("cache object %" PRIx32 " doesn't exist\n", idx);
>> +
>>      ret = write_cache_object(vid, idx, data, datalen, offset);
>>      if (ret == SD_RES_SUCCESS)
>>              update_cache_entry(cache, idx, datalen, offset);
>> +
>> +    object_cache_access_end(entry);
>> +
>>      return ret;
>>  }
>>  
>> @@ -866,20 +1170,40 @@ int object_cache_read(uint64_t oid, char *data, 
>> unsigned int datalen,
>>  {
>>      uint32_t vid = oid_to_vid(oid);
>>      uint32_t idx = object_cache_oid_to_idx(oid);
>> +    struct object_cache *cache;
>> +    struct object_cache_entry *entry;
>> +    int ret;
>> +
>> +    cache = find_object_cache(vid, 0);
>> +
>> +    dprintf("cache object read %" PRIx32 "\n", idx);
>> +
>> +    ret = object_cache_access_begin(cache, idx, &entry);
>> +    if (ret != SD_RES_SUCCESS)
>> +            panic("cache object %" PRIx32 " doesn't exist\n", idx);
>>  
>> -    return read_cache_object(vid, idx, data, datalen, offset);
>> +    ret = read_cache_object(vid, idx, data, datalen, offset);
>> +
>> +    object_cache_access_end(entry);
>> +
>> +    return ret;
>>  }
>>  
>>  int object_cache_flush_vdi(struct request *req)
>>  {
>>      uint32_t vid = oid_to_vid(req->rq.obj.oid);
>>      struct object_cache *cache;
>> +    int ret;
>>  
>>      cache = find_object_cache(vid, 0);
>>      if (!cache)
>>              return SD_RES_SUCCESS;
>>  
>> -    return object_cache_push(cache);
>> +    object_cache_flush_begin(cache);
>> +    ret = object_cache_push(cache);
>> +    object_cache_flush_end(cache);
>> +
>> +    return ret;
>>  }
>>  
>>  int object_cache_flush_and_del(struct request *req)
>> @@ -888,8 +1212,10 @@ int object_cache_flush_and_del(struct request *req)
>>      struct object_cache *cache;
>>  
>>      cache = find_object_cache(vid, 0);
>> +
>>      if (cache && object_cache_flush_and_delete(cache) < 0)
>>              return SD_RES_EIO;
>> +
>>      return SD_RES_SUCCESS;
>>  }
>>  
>> @@ -1003,6 +1329,7 @@ int object_cache_init(const char *p)
>>  
>>      CDS_INIT_LIST_HEAD(&sys_cache.cache_lru_list);
>>      uatomic_set(&sys_cache.cache_size, 0);
>> +    uatomic_set(&sys_cache.reclaiming, 0);
>>  
>>      ret = load_existing_cache();
>>  err:
>> diff --git a/sheep/sheep.c b/sheep/sheep.c
>> index 8b78669..29270e9 100644
>> --- a/sheep/sheep.c
>> +++ b/sheep/sheep.c
>> @@ -359,8 +359,9 @@ int main(int argc, char **argv)
>>      sys->deletion_wqueue = init_work_queue("deletion", true);
>>      sys->block_wqueue = init_work_queue("block", true);
>>      sys->sockfd_wqueue = init_work_queue("sockfd", true);
>> +    sys->reclaim_wqueue = init_work_queue("reclaim", true);
>>      if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
>> -        !sys->deletion_wqueue || !sys->block_wqueue)
>> +        !sys->deletion_wqueue || !sys->block_wqueue || !sys->reclaim_wqueue)
>>              exit(1);
>>  
>>      ret = trace_init();
>> diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
>> index fe61411..2090d67 100644
>> --- a/sheep/sheep_priv.h
>> +++ b/sheep/sheep_priv.h
>> @@ -125,6 +125,7 @@ struct cluster_info {
>>      struct work_queue *recovery_wqueue;
>>      struct work_queue *block_wqueue;
>>      struct work_queue *sockfd_wqueue;
>> +    struct work_queue *reclaim_wqueue;
>>  };
>>  
>>  struct siocb {
>> diff --git a/sheep/store.c b/sheep/store.c
>> index a05822d..4a65c9e 100644
>> --- a/sheep/store.c
>> +++ b/sheep/store.c
>> @@ -492,9 +492,13 @@ int write_object(uint64_t oid, char *data, unsigned int 
>> datalen,
>>      struct sd_req hdr;
>>      int ret;
>>  
>> +retry:
>>      if (sys->enable_write_cache && object_is_cached(oid)) {
>>              ret = object_cache_write(oid, data, datalen, offset,
>>                                       flags, create);
>> +            if (ret == SD_RES_NO_CACHE)
>> +                    goto retry;
>> +
>>              if (ret != 0) {
>>                      eprintf("write cache failed %"PRIx64" %"PRIx32"\n",
>>                              oid, ret);
>> @@ -529,8 +533,12 @@ int read_object(uint64_t oid, char *data, unsigned int 
>> datalen,
>>      struct sd_req hdr;
>>      int ret;
>>  
>> +retry:
>>      if (sys->enable_write_cache && object_is_cached(oid)) {
>>              ret = object_cache_read(oid, data, datalen, offset);
>> +            if (ret == SD_RES_NO_CACHE)
>> +                    goto retry;
>> +
>>              if (ret != SD_RES_SUCCESS) {
>>                      eprintf("try forward read %"PRIx64" %"PRIx32"\n",
>>                              oid, ret);
>>
> 


-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to