Author: dlee
Date: Fri Aug  2 11:48:05 2013
New Revision: 396134

URL: http://svnview.digium.com/svn/asterisk?view=rev&rev=396134
Log:
Forward _all messages to _all_cached

Modified:
    team/dlee/cache-pattern-fix/main/stasis_cache.c
    team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c
    team/dlee/cache-pattern-fix/tests/test_stasis.c

Modified: team/dlee/cache-pattern-fix/main/stasis_cache.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/main/stasis_cache.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/main/stasis_cache.c (original)
+++ team/dlee/cache-pattern-fix/main/stasis_cache.c Fri Aug  2 11:48:05 2013
@@ -426,8 +426,7 @@
 
        id = caching_topic->cache->id_fn(message);
        if (id == NULL) {
-               /* Object isn't cached; forward */
-               stasis_forward_message(caching_topic->topic, topic, message);
+               /* Object isn't cached; discard */
        } else {
                /* Update the cache */
                RAII_VAR(struct stasis_message *, old_snapshot, NULL, 
ao2_cleanup);

Modified: team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c (original)
+++ team/dlee/cache-pattern-fix/main/stasis_cache_pattern.c Fri Aug  2 11:48:05 
2013
@@ -38,14 +38,16 @@
        struct stasis_topic *topic;
        struct stasis_topic *topic_cached;
        struct stasis_cache *cache;
+
+       struct stasis_subscription *forward_all_to_cached;
 };
 
 struct stasis_cp_single {
        struct stasis_topic *topic;
        struct stasis_caching_topic *topic_cached;
 
-       struct stasis_subscription *forward;
-       struct stasis_subscription *forward_cached;
+       struct stasis_subscription *forward_topic_to_all;
+       struct stasis_subscription *forward_cached_to_all;
 };
 
 static void all_dtor(void *obj)
@@ -53,8 +55,13 @@
        struct stasis_cp_all *all = obj;
 
        ao2_cleanup(all->topic);
+       all->topic = NULL;
        ao2_cleanup(all->topic_cached);
+       all->topic_cached = NULL;
        ao2_cleanup(all->cache);
+       all->cache = NULL;
+       stasis_unsubscribe_and_join(all->forward_all_to_cached);
+       all->forward_all_to_cached = NULL;
 }
 
 struct stasis_cp_all *stasis_cp_all_create(const char *name,
@@ -76,8 +83,11 @@
        all->topic = stasis_topic_create(name);
        all->topic_cached = stasis_topic_create(cached_name);
        all->cache = stasis_cache_create(id_fn);
+       all->forward_all_to_cached =
+               stasis_forward_all(all->topic, all->topic_cached);
 
-       if (!all->topic || !all->topic_cached || !all->cache) {
+       if (!all->topic || !all->topic_cached || !all->cache ||
+               !all->forward_all_to_cached) {
                return NULL;
        }
 
@@ -116,8 +126,8 @@
 
        /* Should already be unsubscribed */
        ast_assert(one->topic_cached == NULL);
-       ast_assert(one->forward == NULL);
-       ast_assert(one->forward_cached == NULL);
+       ast_assert(one->forward_topic_to_all == NULL);
+       ast_assert(one->forward_cached_to_all == NULL);
 
        ao2_cleanup(one->topic);
        one->topic = NULL;
@@ -142,13 +152,13 @@
                return NULL;
        }
 
-       one->forward = stasis_forward_all(one->topic, all->topic);
-       if (!one->forward) {
+       one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
+       if (!one->forward_topic_to_all) {
                return NULL;
        }
-       one->forward_cached = stasis_forward_all(
+       one->forward_cached_to_all = stasis_forward_all(
                stasis_caching_get_topic(one->topic_cached), all->topic_cached);
-       if (!one->forward_cached) {
+       if (!one->forward_cached_to_all) {
                return NULL;
        }
 
@@ -164,10 +174,10 @@
 
        stasis_caching_unsubscribe(one->topic_cached);
        one->topic_cached = NULL;
-       stasis_unsubscribe(one->forward);
-       one->forward = NULL;
-       stasis_unsubscribe(one->forward_cached);
-       one->forward_cached = NULL;
+       stasis_unsubscribe(one->forward_topic_to_all);
+       one->forward_topic_to_all = NULL;
+       stasis_unsubscribe(one->forward_cached_to_all);
+       one->forward_cached_to_all = NULL;
 }
 
 struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)

Modified: team/dlee/cache-pattern-fix/tests/test_stasis.c
URL: 
http://svnview.digium.com/svn/asterisk/team/dlee/cache-pattern-fix/tests/test_stasis.c?view=diff&rev=396134&r1=396133&r2=396134
==============================================================================
--- team/dlee/cache-pattern-fix/tests/test_stasis.c (original)
+++ team/dlee/cache-pattern-fix/tests/test_stasis.c Fri Aug  2 11:48:05 2013
@@ -610,7 +610,7 @@
        return cachable->id;
 }
 
-AST_TEST_DEFINE(cache_passthrough)
+AST_TEST_DEFINE(cache_filter)
 {
        RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, 
ao2_cleanup);
        RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -620,14 +620,13 @@
        RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
        RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
        int actual_len;
-       struct stasis_message_type *actual_type;
-
-       switch (cmd) {
-       case TEST_INIT:
-               info->name = __func__;
-               info->category = test_category;
-               info->summary = "Test passing messages through cache topic 
unscathed.";
-               info->description = "Test passing messages through cache topic 
unscathed.";
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category;
+               info->summary = "Test caching topics only forward cache_update 
messages.";
+               info->description = "Test caching topics only forward 
cache_update messages.";
                return AST_TEST_NOT_RUN;
        case TEST_EXECUTE:
                break;
@@ -652,13 +651,8 @@
 
        stasis_publish(topic, test_message);
 
-       actual_len = consumer_wait_for(consumer, 1);
-       ast_test_validate(test, 1 == actual_len);
-
-       actual_type = stasis_message_type(consumer->messages_rxed[0]);
-       ast_test_validate(test, non_cache_type == actual_type);
-
-       ast_test_validate(test, test_message == consumer->messages_rxed[0]);
+       actual_len = consumer_should_stay(consumer, 0);
+       ast_test_validate(test, 0 == actual_len);
 
        return AST_TEST_PASS;
 }
@@ -1113,8 +1107,9 @@
        ast_test_validate(test, 1 == actual_len);
        actual_len = consumer_wait_for(consumer2, 1);
        ast_test_validate(test, 1 == actual_len);
-       actual_len = consumer_wait_for(consumer3, 1);
-       ast_test_validate(test, 1 == actual_len);
+       /* Uncacheable message should not be passed through */
+       actual_len = consumer_should_stay(consumer3, 0);
+       ast_test_validate(test, 0 == actual_len);
 
        actual = consumer1->messages_rxed[0];
        ast_test_validate(test, stasis_cache_update_type() == 
stasis_message_type(actual));
@@ -1127,9 +1122,6 @@
        update = stasis_message_data(actual);
        ast_test_validate(test, test_message_type2 == update->type);
        ast_test_validate(test, test_message2 == update->new_snapshot);
-
-       actual = consumer3->messages_rxed[0];
-       ast_test_validate(test, test_message3 == actual);
 
        /* consumer1 and consumer2 do not get the final message. */
        ao2_cleanup(consumer1);
@@ -1287,7 +1279,7 @@
        AST_TEST_UNREGISTER(publish);
        AST_TEST_UNREGISTER(unsubscribe_stops_messages);
        AST_TEST_UNREGISTER(forward);
-       AST_TEST_UNREGISTER(cache_passthrough);
+       AST_TEST_UNREGISTER(cache_filter);
        AST_TEST_UNREGISTER(cache);
        AST_TEST_UNREGISTER(cache_dump);
        AST_TEST_UNREGISTER(route_conflicts);
@@ -1309,7 +1301,7 @@
        AST_TEST_REGISTER(publish);
        AST_TEST_REGISTER(unsubscribe_stops_messages);
        AST_TEST_REGISTER(forward);
-       AST_TEST_REGISTER(cache_passthrough);
+       AST_TEST_REGISTER(cache_filter);
        AST_TEST_REGISTER(cache);
        AST_TEST_REGISTER(cache_dump);
        AST_TEST_REGISTER(route_conflicts);


--
_____________________________________________________________________
-- Bandwidth and Colocation Provided by http://www.api-digital.com --

svn-commits mailing list
To UNSUBSCRIBE or update options visit:
   http://lists.digium.com/mailman/listinfo/svn-commits

Reply via email to