http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c index 2dcec25,0000000..9929437 mode 100644,000000..100644 --- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c +++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c @@@ -1,1040 -1,0 +1,1059 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_admin_impl.c + * + * \date Sep 30, 2011 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include "pubsub_admin_impl.h" +#include <zmq.h> + +#include <stdio.h> +#include <stdlib.h> + +#include <arpa/inet.h> +#include <sys/socket.h> +#include <netdb.h> + +#ifndef ANDROID +#include <ifaddrs.h> +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include "constants.h" +#include "utils.h" +#include "hash_map.h" +#include "array_list.h" +#include "bundle_context.h" +#include "bundle.h" +#include "service_reference.h" +#include "service_registration.h" +#include "log_helper.h" +#include "log_service.h" +#include "celix_threads.h" +#include "service_factory.h" + +#include "topic_subscription.h" +#include "topic_publication.h" +#include "pubsub_endpoint.h" +#include "pubsub_utils.h" +#include "pubsub/subscriber.h" + +#define MAX_KEY_FOLDER_PATH_LENGTH 512 + +static const char *DEFAULT_IP = "127.0.0.1"; + +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip); +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP); + +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc); +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication); +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication); + +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt *admin) { + celix_status_t status = CELIX_SUCCESS; + +#ifdef BUILD_WITH_ZMQ_SECURITY + if (!zsys_has_curve()){ + printf("PSA_ZMQ: zeromq curve unsupported\n"); + return CELIX_SERVICE_EXCEPTION; + } +#endif + + *admin = calloc(1, sizeof(**admin)); + + if (!*admin) { + status = CELIX_ENOMEM; + } + else{ + + const char *ip = NULL; + char *detectedIp = NULL; + (*admin)->bundle_context= context; + (*admin)->localPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->pendingSubscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->externalPublications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*admin)->topicSubscriptionsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + (*admin)->topicPublicationsPerSerializer = hashMap_create(NULL, NULL, NULL, NULL); + arrayList_create(&((*admin)->noSerializerSubscriptions)); + arrayList_create(&((*admin)->noSerializerPublications)); + arrayList_create(&((*admin)->serializerList)); + + celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL); + celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL); + celixThreadMutex_create(&(*admin)->externalPublicationsLock, NULL); + celixThreadMutex_create(&(*admin)->serializerListLock, NULL); + celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL); + + celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr); + celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, CELIX_THREAD_MUTEX_RECURSIVE); + celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, &(*admin)->noSerializerPendingsAttr); + + celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr); + celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, CELIX_THREAD_MUTEX_RECURSIVE); + celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, &(*admin)->pendingSubscriptionsAttr); + + if (logHelper_create(context, &(*admin)->loghelper) == CELIX_SUCCESS) { + logHelper_start((*admin)->loghelper); + } + + bundleContext_getProperty(context,PSA_IP , &ip); + +#ifndef ANDROID + if (ip == NULL) { + const char *interface = NULL; + + bundleContext_getProperty(context, PSA_ITF, &interface); + if (pubsubAdmin_getIpAdress(interface, &detectedIp) != CELIX_SUCCESS) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface %s", interface); + } + + ip = detectedIp; + } +#endif + + if (ip != NULL) { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip); + (*admin)->ipAddress = strdup(ip); + } + else { + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. Using %s", DEFAULT_IP); + (*admin)->ipAddress = strdup(DEFAULT_IP); + } + + if (detectedIp != NULL) { + free(detectedIp); + } + + const char* basePortStr = NULL; + const char* maxPortStr = NULL; + char* endptrBase = NULL; + char* endptrMax = NULL; + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr); + bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, "PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr); + (*admin)->basePort = strtol(basePortStr, &endptrBase, 10); + (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10); + if (*endptrBase != '\0') { + (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT; + } + if (*endptrMax != '\0') { + (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT; + } + + printf("PSA Using base port %u to max port %u\n", (*admin)->basePort, (*admin)->maxPort); + + // Disable Signal Handling by CZMQ + setenv("ZSYS_SIGHANDLER", "false", true); + + const char *nrZmqThreads = NULL; + bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", &nrZmqThreads); + + if(nrZmqThreads != NULL) { + char *endPtr = NULL; + unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 10); + if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads < 50) { + zsys_set_io_threads(nrThreads); + logHelper_log((*admin)->loghelper, OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads); + printf("PSA_ZMQ: Using %d threads for ZMQ\n", nrThreads); + } + } + +#ifdef BUILD_WITH_ZMQ_SECURITY + // Setup authenticator + zactor_t* auth = zactor_new (zauth, NULL); + zstr_sendx(auth, "VERBOSE", NULL); + + // Load all public keys of subscribers into the application + // This step is done for authenticating subscribers + char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH]; + char* keys_bundle_dir = pubsub_getKeysBundleDir(context); + snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, "%s/META-INF/keys/subscriber/public", keys_bundle_dir); + zstr_sendx (auth, "CURVE", curve_folder_path, NULL); + free(keys_bundle_dir); + + (*admin)->zmq_auth = auth; +#endif + + } + + return status; +} + + +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin) +{ + celix_status_t status = CELIX_SUCCESS; + + free(admin->ipAddress); + + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->pendingSubscriptions); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free((char*)hashMapEntry_getKey(entry)); + arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->pendingSubscriptions,false,false); + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + celixThreadMutex_lock(&admin->subscriptionsLock); + hashMap_destroy(admin->subscriptions,false,false); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + celixThreadMutex_lock(&admin->localPublicationsLock); + hashMap_destroy(admin->localPublications,true,false); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + celixThreadMutex_lock(&admin->externalPublicationsLock); + iter = hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free((char*)hashMapEntry_getKey(entry)); + arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->externalPublications,false,false); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_destroy(admin->serializerList); + celixThreadMutex_unlock(&admin->serializerListLock); + + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_destroy(admin->noSerializerSubscriptions); + arrayList_destroy(admin->noSerializerPublications); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + + celixThreadMutex_lock(&admin->usedSerializersLock); + + iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false); + + iter = hashMapIterator_create(admin->topicPublicationsPerSerializer); + while(hashMapIterator_hasNext(iter)){ + arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(admin->topicPublicationsPerSerializer,false,false); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + + celixThreadMutex_destroy(&admin->usedSerializersLock); + celixThreadMutex_destroy(&admin->serializerListLock); + celixThreadMutex_destroy(&admin->pendingSubscriptionsLock); + + celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr); + celixThreadMutex_destroy(&admin->noSerializerPendingsLock); + + celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr); + celixThreadMutex_destroy(&admin->subscriptionsLock); + + celixThreadMutex_destroy(&admin->localPublicationsLock); + celixThreadMutex_destroy(&admin->externalPublicationsLock); + + logHelper_stop(admin->loghelper); + + logHelper_destroy(&admin->loghelper); + +#ifdef BUILD_WITH_ZMQ_SECURITY + if (admin->zmq_auth != NULL){ + zactor_destroy(&(admin->zmq_auth)); + } +#endif + + free(admin); + + return status; +} + +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt any_sub = hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); + + if(any_sub==NULL){ + + int i; + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ + status = pubsub_topicSubscriptionCreate(admin->bundle_context, PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, &any_sub); + } + else{ - printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); ++ printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", ++ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status == CELIX_SUCCESS){ + + /* Connect all internal publishers */ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt lp_iter =hashMapIterator_create(admin->localPublications); + while(hashMapIterator_hasNext(lp_iter)){ + service_factory_pt factory = (service_factory_pt)hashMapIterator_nextValue(lp_iter); + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;i<arrayList_size(topic_publishers);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); ++ if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ ++ status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + arrayList_destroy(topic_publishers); + } + } + hashMapIterator_destroy(lp_iter); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Connect also all external publishers */ + celixThreadMutex_lock(&admin->externalPublicationsLock); + hash_map_iterator_pt extp_iter =hashMapIterator_create(admin->externalPublications); + while(hashMapIterator_hasNext(extp_iter)){ + array_list_pt ext_pub_list = (array_list_pt)hashMapIterator_nextValue(extp_iter); + if(ext_pub_list!=NULL){ + for(i=0;i<arrayList_size(ext_pub_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint); ++ if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ ++ status += pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + } + } + hashMapIterator_destroy(extp_iter); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + + + pubsub_topicSubscriptionAddSubscriber(any_sub,subEP); + + status += pubsub_topicSubscriptionStart(any_sub); + + } + + if (status == CELIX_SUCCESS){ + hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub); + connectTopicPubSubToSerializer(admin, best_serializer, any_sub, false); + } + + } + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; +} + +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + - printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, subEP->topic); ++ printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", ++ properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ subEP->serviceID, ++ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), ++ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + - if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){ ++ if(strcmp(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){ + return pubsubAdmin_addAnySubscription(admin,subEP); + } + + /* Check if we already know some publisher about this topic, otherwise let's put the subscription in the pending hashmap */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + celixThreadMutex_lock(&admin->subscriptionsLock); + celixThreadMutex_lock(&admin->localPublicationsLock); + celixThreadMutex_lock(&admin->externalPublicationsLock); + - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); ++ char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); + + if(factory==NULL && ext_pub_list==NULL){ //No (local or external) publishers yet for this topic + pubsubAdmin_addSubscriptionToPendingList(admin,subEP); + } + else{ + int i; + topic_subscription_pt subscription = hashMap_get(admin->subscriptions, scope_topic); + + if(subscription == NULL) { + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, subEP, &best_serializer)) == CELIX_SUCCESS){ - status += pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, subEP->topic, best_serializer, &subscription); ++ status += pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, &subscription); + } + else{ - printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n",subEP->topic); ++ printf("PSA_ZMQ: Cannot find a serializer for subscribing topic %s. Adding it to pending list.\n", ++ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status==CELIX_SUCCESS){ + + /* Try to connect internal publishers */ + if(factory!=NULL){ + topic_publication_pt topic_pubs = (topic_publication_pt)factory->handle; + array_list_pt topic_publishers = pubsub_topicPublicationGetPublisherList(topic_pubs); + + if(topic_publishers!=NULL){ + for(i=0;i<arrayList_size(topic_publishers);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); ++ if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ ++ status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + arrayList_destroy(topic_publishers); + } + + } + + /* Look also for external publishers */ + if(ext_pub_list!=NULL){ + for(i=0;i<arrayList_size(ext_pub_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if(pubEP->endpoint !=NULL){ - status += pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint); ++ if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){ ++ status += pubsub_topicSubscriptionConnectPublisher(subscription,(char*) properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + } + } + + pubsub_topicSubscriptionAddSubscriber(subscription,subEP); + + status += pubsub_topicSubscriptionStart(subscription); + + } + + if(status==CELIX_SUCCESS){ + + hashMap_put(admin->subscriptions,strdup(scope_topic),subscription); + + connectTopicPubSubToSerializer(admin, best_serializer, subscription, false); + } + } + + if (status == CELIX_SUCCESS){ + pubsub_topicIncreaseNrSubscribers(subscription); + } + } + + free(scope_topic); + celixThreadMutex_unlock(&admin->externalPublicationsLock); + celixThreadMutex_unlock(&admin->localPublicationsLock); + celixThreadMutex_unlock(&admin->subscriptionsLock); + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; + - printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic); ++ printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld topic=%s]\n", ++ properties_get(subEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ subEP->serviceID, ++ properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); ++ char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + celixThreadMutex_lock(&admin->subscriptionsLock); + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); + if(sub!=NULL){ + pubsub_topicDecreaseNrSubscribers(sub); + if(pubsub_topicGetNrSubscribers(sub) == 0) { + status = pubsub_topicSubscriptionRemoveSubscriber(sub,subEP); + } + } + celixThreadMutex_unlock(&admin->subscriptionsLock); + + if(sub==NULL){ + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerSubscriptions, subEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + free(scope_topic); + + + + return status; + +} + +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + - printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, pubEP->topic); ++ printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, topic=%s]\n", ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ pubEP->serviceID, ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + const char* fwUUID = NULL; + + bundleContext_getProperty(admin->bundle_context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); + if (fwUUID == NULL) { + printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); ++ char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + - if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == NULL)) { ++ if ((strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) && ++ (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) == NULL)) { + + celixThreadMutex_lock(&admin->localPublicationsLock); + + service_factory_pt factory = (service_factory_pt) hashMap_get(admin->localPublications, scope_topic); + + if (factory == NULL) { + topic_publication_pt pub = NULL; + pubsub_serializer_service_t *best_serializer = NULL; + if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, &best_serializer)) == CELIX_SUCCESS){ + status = pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, admin->ipAddress, admin->basePort, admin->maxPort, &pub); + } + else{ - printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", pubEP->topic); ++ printf("PSA_ZMQ: Cannot find a serializer for publishing topic %s. Adding it to pending list.\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + if (status == CELIX_SUCCESS) { + status = pubsub_topicPublicationStart(admin->bundle_context, pub, &factory); + if (status == CELIX_SUCCESS && factory != NULL) { + hashMap_put(admin->localPublications, strdup(scope_topic), factory); + connectTopicPubSubToSerializer(admin, best_serializer, pub, true); + } + } else { - printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, pubEP->topic, pubEP->serviceID); ++ printf("PSA_ZMQ: Cannot create a topicPublication for scope=%s, topic=%s (bundle %ld).\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ pubEP->serviceID); + } + } else { + //just add the new EP to the list + topic_publication_pt pub = (topic_publication_pt) factory->handle; + pubsub_topicPublicationAddPublisherEP(pub, pubEP); + } + + celixThreadMutex_unlock(&admin->localPublicationsLock); + } + else{ + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt) hashMap_get(admin->externalPublications, scope_topic); + if (ext_pub_list == NULL) { + arrayList_create(&ext_pub_list); + hashMap_put(admin->externalPublications, strdup(scope_topic), ext_pub_list); + } + + arrayList_add(ext_pub_list, pubEP); + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Re-evaluate the pending subscriptions */ + celixThreadMutex_lock(&admin->pendingSubscriptionsLock); + + hash_map_entry_pt pendingSub = hashMap_getEntry(admin->pendingSubscriptions, scope_topic); + if (pendingSub != NULL) { //There were pending subscription for the just published topic. Let's connect them. + char* topic = (char*) hashMapEntry_getKey(pendingSub); + array_list_pt pendingSubList = (array_list_pt) hashMapEntry_getValue(pendingSub); + int i; + for (i = 0; i < arrayList_size(pendingSubList); i++) { + pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) arrayList_get(pendingSubList, i); + pubsubAdmin_addSubscription(admin, subEP); + } + hashMap_remove(admin->pendingSubscriptions, scope_topic); + arrayList_clear(pendingSubList); + arrayList_destroy(pendingSubList); + free(topic); + } + + celixThreadMutex_unlock(&admin->pendingSubscriptionsLock); + + /* Connect the new publisher to the subscription for his topic, if there is any */ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, scope_topic); - if (sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, pubEP->endpoint); ++ if (sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) { ++ pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt) hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC); - if (any_sub != NULL && pubEP->endpoint != NULL) { - pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, pubEP->endpoint); ++ if (any_sub != NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) != NULL) { ++ pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, (char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + free(scope_topic); + + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt admin,pubsub_endpoint_pt pubEP){ + celix_status_t status = CELIX_SUCCESS; + int count = 0; + - printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic); ++ printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld topic=%s]\n", ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ pubEP->serviceID, ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + const char* fwUUID = NULL; + + bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + if(fwUUID==NULL){ + printf("PSA_ZMQ: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } - char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic); ++ char *scope_topic = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + - if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ ++ if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){ + + celixThreadMutex_lock(&admin->localPublicationsLock); + service_factory_pt factory = (service_factory_pt)hashMap_get(admin->localPublications,scope_topic); + if(factory!=NULL){ + topic_publication_pt pub = (topic_publication_pt)factory->handle; + pubsub_topicPublicationRemovePublisherEP(pub,pubEP); + } + celixThreadMutex_unlock(&admin->localPublicationsLock); + + if(factory==NULL){ + /* Maybe the endpoint was pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){ + status = CELIX_ILLEGAL_STATE; + } + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + } + else{ + + celixThreadMutex_lock(&admin->externalPublicationsLock); + array_list_pt ext_pub_list = (array_list_pt)hashMap_get(admin->externalPublications,scope_topic); + if(ext_pub_list!=NULL){ + int i; + bool found = false; + for(i=0;!found && i<arrayList_size(ext_pub_list);i++){ + pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); + found = pubsubEndpoint_equals(pubEP,p); + if (found){ + arrayList_remove(ext_pub_list,i); + } + } + // Check if there are more publishers on the same endpoint (happens when 1 celix-instance with multiple bundles publish in same topic) + for(i=0; i<arrayList_size(ext_pub_list);i++) { + pubsub_endpoint_pt p = (pubsub_endpoint_pt)arrayList_get(ext_pub_list,i); - if (strcmp(pubEP->endpoint,p->endpoint) == 0) { ++ if (strcmp(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 0) { + count++; + } + } + + if(arrayList_size(ext_pub_list)==0){ + hash_map_entry_pt entry = hashMap_getEntry(admin->externalPublications,scope_topic); + char* topic = (char*)hashMapEntry_getKey(entry); + array_list_pt list = (array_list_pt)hashMapEntry_getValue(entry); + hashMap_remove(admin->externalPublications,topic); + arrayList_destroy(list); + free(topic); + } + } + + celixThreadMutex_unlock(&admin->externalPublicationsLock); + } + + /* Check if this publisher was connected to one of our subscribers*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + + topic_subscription_pt sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic); - if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint); ++ if(sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){ ++ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + /* And check also for ANY subscription */ + topic_subscription_pt any_sub = (topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC); - if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){ - pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint); ++ if(any_sub!=NULL && properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL && count == 0){ ++ pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + } + + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char *scope, char* topic){ + celix_status_t status = CELIX_SUCCESS; + + printf("PSA_ZMQ: Closing all publications\n"); + + celixThreadMutex_lock(&admin->localPublicationsLock); + char *scope_topic = createScopeTopicKey(scope, topic); + hash_map_entry_pt pubsvc_entry = (hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic); + if(pubsvc_entry!=NULL){ + char* key = (char*)hashMapEntry_getKey(pubsvc_entry); + service_factory_pt factory= (service_factory_pt)hashMapEntry_getValue(pubsvc_entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + status += pubsub_topicPublicationStop(pub); + disconnectTopicPubSubFromSerializer(admin, pub, true); + status += pubsub_topicPublicationDestroy(pub); + hashMap_remove(admin->localPublications,scope_topic); + free(key); + free(factory); + } + free(scope_topic); + celixThreadMutex_unlock(&admin->localPublicationsLock); + + return status; + +} + +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* scope,char* topic){ + celix_status_t status = CELIX_SUCCESS; + + printf("PSA_ZMQ: Closing all subscriptions\n"); + + celixThreadMutex_lock(&admin->subscriptionsLock); + char *scope_topic = createScopeTopicKey(scope, topic); + hash_map_entry_pt sub_entry = (hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic); + if(sub_entry!=NULL){ + char* topic = (char*)hashMapEntry_getKey(sub_entry); + + topic_subscription_pt ts = (topic_subscription_pt)hashMapEntry_getValue(sub_entry); + status += pubsub_topicSubscriptionStop(ts); + disconnectTopicPubSubFromSerializer(admin, ts, false); + status += pubsub_topicSubscriptionDestroy(ts); + hashMap_remove(admin->subscriptions,scope_topic); + free(topic); + + } + free(scope_topic); + celixThreadMutex_unlock(&admin->subscriptionsLock); + + return status; + +} + + +#ifndef ANDROID +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** ip) { + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + struct ifaddrs *ifaddr, *ifa; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) != -1) + { + for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == NULL) + continue; + + if ((getnameinfo(ifa->ifa_addr,sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && (ifa->ifa_addr->sa_family == AF_INET)) { + if (interface == NULL) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + else if (strcmp(ifa->ifa_name, interface) == 0) { + *ip = strdup(host); + status = CELIX_SUCCESS; + } + } + } + + freeifaddrs(ifaddr); + } + + return status; +} +#endif + +static celix_status_t pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt admin,pubsub_endpoint_pt subEP){ + celix_status_t status = CELIX_SUCCESS; - char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic); ++ char* scope_topic = createScopeTopicKey(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pendingListPerTopic = hashMap_get(admin->pendingSubscriptions,scope_topic); + if(pendingListPerTopic==NULL){ + arrayList_create(&pendingListPerTopic); + hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic); + } + arrayList_add(pendingListPerTopic,subEP); + free(scope_topic); + return status; +} + +celix_status_t pubsubAdmin_serializerAdded(void * handle, service_reference_pt reference, void * service){ + /* Assumption: serializers are all available at startup. + * If a new (possibly better) serializer is installed and started, already created topic_publications/subscriptions will not be destroyed and recreated */ + + celix_status_t status = CELIX_SUCCESS; + int i=0; + + const char *serType = NULL; + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType == NULL){ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); + return CELIX_SERVICE_EXCEPTION; + } + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + celixThreadMutex_lock(&admin->serializerListLock); + arrayList_add(admin->serializerList, reference); + celixThreadMutex_unlock(&admin->serializerListLock); + + /* Now let's re-evaluate the pending */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + + for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addSubscription(admin, ep); + } + } + + for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){ + pubsub_endpoint_pt ep = (pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i); + pubsub_serializer_service_t *best_serializer = NULL; + pubsubAdmin_getBestSerializer(admin, ep, &best_serializer); + if(best_serializer != NULL){ /* Finally we have a valid serializer! */ + pubsubAdmin_addPublication(admin, ep); + } + } + + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + + printf("PSA_ZMQ: %s serializer added\n",serType); + + return status; +} + +celix_status_t pubsubAdmin_serializerRemoved(void * handle, service_reference_pt reference, void * service){ + + pubsub_admin_pt admin = (pubsub_admin_pt)handle; + int i=0, j=0; + const char *serType = NULL; + + serviceReference_getProperty(reference, PUBSUB_SERIALIZER_TYPE_KEY,&serType); + if(serType == NULL){ + printf("Serializer serviceReference %p has no pubsub_serializer.type property specified\n",reference); + return CELIX_SERVICE_EXCEPTION; + } + + celixThreadMutex_lock(&admin->serializerListLock); + /* Remove the serializer from the list */ + arrayList_removeElement(admin->serializerList, reference); + celixThreadMutex_unlock(&admin->serializerListLock); + + + celixThreadMutex_lock(&admin->usedSerializersLock); + array_list_pt topicPubList = (array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service); + array_list_pt topicSubList = (array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service); + celixThreadMutex_unlock(&admin->usedSerializersLock); + + /* Now destroy the topicPublications, but first put back the pubsub_endpoints back to the noSerializer pending list */ + if(topicPubList!=NULL){ + for(i=0;i<arrayList_size(topicPubList);i++){ + topic_publication_pt topicPub = (topic_publication_pt)arrayList_get(topicPubList,i); + /* Stop the topic publication */ + pubsub_topicPublicationStop(topicPub); + /* Get the endpoints that are going to be orphan */ + array_list_pt pubList = pubsub_topicPublicationGetPublisherList(topicPub); + for(j=0;j<arrayList_size(pubList);j++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubList,j); + /* Remove the publication */ + pubsubAdmin_removePublication(admin, pubEP); + /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(pubEP->endpoint!=NULL){ - free(pubEP->endpoint); - pubEP->endpoint = NULL; ++ if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){ ++ properties_unset(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL); + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerPublications,pubEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + arrayList_destroy(pubList); + + /* Cleanup also the localPublications hashmap*/ + celixThreadMutex_lock(&admin->localPublicationsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->localPublications); + char *key = NULL; + service_factory_pt factory = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + factory = (service_factory_pt)hashMapEntry_getValue(entry); + topic_publication_pt pub = (topic_publication_pt)factory->handle; + if(pub==topicPub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->localPublications, key); + free(factory); + free(key); + } + celixThreadMutex_unlock(&admin->localPublicationsLock); + + /* Finally destroy the topicPublication */ + pubsub_topicPublicationDestroy(topicPub); + } + arrayList_destroy(topicPubList); + } + + /* Now destroy the topicSubscriptions, but first put back the pubsub_endpoints back to the noSerializer pending list */ + if(topicSubList!=NULL){ + for(i=0;i<arrayList_size(topicSubList);i++){ + topic_subscription_pt topicSub = (topic_subscription_pt)arrayList_get(topicSubList,i); + /* Stop the topic subscription */ + pubsub_topicSubscriptionStop(topicSub); + /* Get the endpoints that are going to be orphan */ + array_list_pt subList = pubsub_topicSubscriptionGetSubscribersList(topicSub); + for(j=0;j<arrayList_size(subList);j++){ + pubsub_endpoint_pt subEP = (pubsub_endpoint_pt)arrayList_get(subList,j); + /* Remove the subscription */ + pubsubAdmin_removeSubscription(admin, subEP); + /* Reset the endpoint field, so that will be recreated from scratch when a new serializer will be found */ - if(subEP->endpoint!=NULL){ - free(subEP->endpoint); - subEP->endpoint = NULL; ++ if(properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL){ ++ properties_unset(subEP->endpoint_props, PUBSUB_ENDPOINT_URL); + } + /* Add the orphan endpoint to the noSerializer pending list */ + celixThreadMutex_lock(&admin->noSerializerPendingsLock); + arrayList_add(admin->noSerializerSubscriptions,subEP); + celixThreadMutex_unlock(&admin->noSerializerPendingsLock); + } + + /* Cleanup also the subscriptions hashmap*/ + celixThreadMutex_lock(&admin->subscriptionsLock); + hash_map_iterator_pt iter = hashMapIterator_create(admin->subscriptions); + char *key = NULL; + while(hashMapIterator_hasNext(iter)){ + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + topic_subscription_pt sub = (topic_subscription_pt)hashMapEntry_getValue(entry); + if(sub==topicSub){ + key = (char*)hashMapEntry_getKey(entry); + break; + } + } + hashMapIterator_destroy(iter); + if(key!=NULL){ + hashMap_remove(admin->subscriptions, key); + free(key); + } + celixThreadMutex_unlock(&admin->subscriptionsLock); + + /* Finally destroy the topicSubscription */ + pubsub_topicSubscriptionDestroy(topicSub); + } + arrayList_destroy(topicSubList); + } + + + + printf("PSA_ZMQ: %s serializer removed\n",serType); + + + return CELIX_SUCCESS; +} + +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, pubsub_endpoint_pt endpoint, double* score){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; +} + +/* This one recall the same logic as in the match function */ +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){ + + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&admin->serializerListLock); + status = pubsub_admin_get_best_serializer(ep->topic_props, admin->serializerList, serSvc); + celixThreadMutex_unlock(&admin->serializerListLock); + + return status; + +} + +static void connectTopicPubSubToSerializer(pubsub_admin_pt admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + array_list_pt list = (array_list_pt)hashMap_get(map,serializer); + if(list==NULL){ + arrayList_create(&list); + hashMap_put(map,serializer,list); + } + arrayList_add(list,topicPubSub); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +} + +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void *topicPubSub,bool isPublication){ + + celixThreadMutex_lock(&admin->usedSerializersLock); + + hash_map_pt map = isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer; + hash_map_iterator_pt iter = hashMapIterator_create(map); + while(hashMapIterator_hasNext(iter)){ + array_list_pt list = (array_list_pt)hashMapIterator_nextValue(iter); + if(arrayList_removeElement(list, topicPubSub)){ //Found it! + break; + } + } + hashMapIterator_destroy(iter); + + celixThreadMutex_unlock(&admin->usedSerializersLock); + +}
http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_zmq/src/topic_publication.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_admin_zmq/src/topic_publication.c index b612605,0000000..873cec2 mode 100644,000000..100644 --- a/pubsub/pubsub_admin_zmq/src/topic_publication.c +++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c @@@ -1,630 -1,0 +1,631 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <czmq.h> +/* The following undefs prevent the collision between: + * - sys/syslog.h (which is included within czmq) + * - celix/dfi/dfi_log_util.h + */ +#undef LOG_DEBUG +#undef LOG_WARNING +#undef LOG_INFO +#undef LOG_WARNING + +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "array_list.h" +#include "celixbool.h" +#include "service_registration.h" +#include "utils.h" +#include "service_factory.h" +#include "version.h" + +#include "pubsub_common.h" +#include "pubsub_utils.h" +#include "pubsub/publisher.h" + +#include "topic_publication.h" + +#include "pubsub_serializer.h" + +#ifdef BUILD_WITH_ZMQ_SECURITY + #include "zmq_crypto.h" + + #define MAX_CERT_PATH_LENGTH 512 +#endif + +#define EP_ADDRESS_LEN 32 +#define ZMQ_BIND_MAX_RETRY 5 + +#define FIRST_SEND_DELAY 2 + +struct topic_publication { + zsock_t* zmq_socket; + celix_thread_mutex_t socket_lock; //Protects zmq_socket access + zcert_t * zmq_cert; + char* endpoint; + service_registration_pt svcFactoryReg; + array_list_pt pub_ep_list; //List<pubsub_endpoint> + hash_map_pt boundServices; //<bundle_pt,bound_service> + pubsub_serializer_service_t *serializer; + celix_thread_mutex_t tp_lock; +}; + +typedef struct publish_bundle_bound_service { + topic_publication_pt parent; + pubsub_publisher_t service; + bundle_pt bundle; + char *topic; + hash_map_pt msgTypes; + unsigned short getCount; + celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service data structure + bool mp_send_in_progress; + array_list_pt mp_parts; +}* publish_bundle_bound_service_pt; + +/* Note: correct locking order is + * 1. tp_lock + * 2. mp_lock + * 3. socket_lock + * + * tp_lock and socket_lock are independent. + */ + +typedef struct pubsub_msg{ + pubsub_msg_header_pt header; + char* payload; + int payloadSize; +}* pubsub_msg_pt; + +static unsigned int rand_range(unsigned int min, unsigned int max); + +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service); + +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle); +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc); + +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, const void *msg); +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags); +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId); + +static void delay_first_send_for_late_joiners(void); + +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t *best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, topic_publication_pt *out){ + celix_status_t status = CELIX_SUCCESS; + +#ifdef BUILD_WITH_ZMQ_SECURITY + char* secure_topics = NULL; + bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char **) &secure_topics); + + if (secure_topics){ + array_list_pt secure_topics_list = pubsub_getTopicsFromString(secure_topics); + + int i; + int secure_topics_size = arrayList_size(secure_topics_list); + for (i = 0; i < secure_topics_size; i++){ + char* top = arrayList_get(secure_topics_list, i); + if (strcmp(pubEP->topic, top) == 0){ + printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top); + pubEP->is_secure = true; + } + free(top); + top = NULL; + } + + arrayList_destroy(secure_topics_list); + } + + zcert_t* pub_cert = NULL; + if (pubEP->is_secure){ + char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); + if (keys_bundle_dir == NULL){ + return CELIX_SERVICE_EXCEPTION; + } + + const char* keys_file_path = NULL; + const char* keys_file_name = NULL; + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); + bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); + + char cert_path[MAX_CERT_PATH_LENGTH]; + + //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key" + snprintf(cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, pubEP->topic); + free(keys_bundle_dir); + printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path); + + pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, cert_path); + if (pub_cert == NULL){ + printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path); + printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", pubEP->topic); + pubEP->is_secure = false; + } + } +#endif + + zsock_t* socket = zsock_new (ZMQ_PUB); + if(socket==NULL){ + #ifdef BUILD_WITH_ZMQ_SECURITY + if (pubEP->is_secure){ + zcert_destroy(&pub_cert); + } + #endif + + perror("Error for zmq_socket"); + return CELIX_SERVICE_EXCEPTION; + } +#ifdef BUILD_WITH_ZMQ_SECURITY + if (pubEP->is_secure){ + zcert_apply (pub_cert, socket); // apply certificate to socket + zsock_set_curve_server (socket, true); // setup the publisher's socket to use the curve functions + } +#endif + + int rv = -1, retry=0; + char* ep = malloc(EP_ADDRESS_LEN); + char bindAddress[EP_ADDRESS_LEN]; + + while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){ + /* Randomized part due to same bundle publishing on different topics */ + unsigned int port = rand_range(basePort,maxPort); + memset(ep,0,EP_ADDRESS_LEN); + memset(bindAddress, 0, EP_ADDRESS_LEN); + + snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port); - snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind addres than endpoint address ++ snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); //NOTE using a different bind address than endpoint address + rv = zsock_bind (socket, "%s", bindAddress); + if (rv == -1) { + perror("Error for zmq_bind"); + } + retry++; + } + + if(rv == -1){ + free(ep); + return CELIX_SERVICE_EXCEPTION; + } + + /* ZMQ stuffs are all fine at this point. Let's create and initialize the structure */ + + topic_publication_pt pub = calloc(1,sizeof(*pub)); + + arrayList_create(&(pub->pub_ep_list)); + pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL); + celixThreadMutex_create(&(pub->tp_lock),NULL); + + pub->endpoint = ep; + pub->zmq_socket = socket; + pub->serializer = best_serializer; + + celixThreadMutex_create(&(pub->socket_lock),NULL); + +#ifdef BUILD_WITH_ZMQ_SECURITY + if (pubEP->is_secure){ + pub->zmq_cert = pub_cert; + } +#endif + + pubsub_topicPublicationAddPublisherEP(pub,pubEP); + + *out = pub; + + return status; +} + +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&(pub->tp_lock)); + + free(pub->endpoint); + arrayList_destroy(pub->pub_ep_list); + + hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices); + while(hashMapIterator_hasNext(iter)){ + publish_bundle_bound_service_pt bound = hashMapIterator_nextValue(iter); + pubsub_destroyPublishBundleBoundService(bound); + } + hashMapIterator_destroy(iter); + hashMap_destroy(pub->boundServices,false,false); + + pub->svcFactoryReg = NULL; + pub->serializer = NULL; +#ifdef BUILD_WITH_ZMQ_SECURITY + zcert_destroy(&(pub->zmq_cert)); +#endif + + celixThreadMutex_unlock(&(pub->tp_lock)); + + celixThreadMutex_destroy(&(pub->tp_lock)); + + celixThreadMutex_lock(&(pub->socket_lock)); + zsock_destroy(&(pub->zmq_socket)); + celixThreadMutex_unlock(&(pub->socket_lock)); + + celixThreadMutex_destroy(&(pub->socket_lock)); + + free(pub); + + return status; +} + +celix_status_t pubsub_topicPublicationStart(bundle_context_pt bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){ + celix_status_t status = CELIX_SUCCESS; + + /* Let's register the new service */ + + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0); + + if(pubEP!=NULL){ + service_factory_pt factory = calloc(1, sizeof(*factory)); + factory->handle = pub; + factory->getService = pubsub_topicPublicationGetService; + factory->ungetService = pubsub_topicPublicationUngetService; + + properties_pt props = properties_create(); - properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic); - properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope); ++ properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); ++ properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE)); + properties_set(props,"service.version", PUBSUB_PUBLISHER_SERVICE_VERSION); + + status = bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg)); + + if(status != CELIX_SUCCESS){ + properties_destroy(props); - printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID); ++ printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register ServiceFactory for topic %s (bundle %ld).\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),pubEP->serviceID); + } + else{ + *svcFactory = factory; + } + } + else{ + printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after adding it...Should never happen!\n"); + status = CELIX_SERVICE_EXCEPTION; + } + + return status; +} + +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){ + return serviceRegistration_unregister(pub->svcFactoryReg); +} + +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ + + celixThreadMutex_lock(&(pub->tp_lock)); - ep->endpoint = strdup(pub->endpoint); ++ pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint); + arrayList_add(pub->pub_ep_list,ep); + celixThreadMutex_unlock(&(pub->tp_lock)); + + return CELIX_SUCCESS; +} + +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt pub,pubsub_endpoint_pt ep){ + + celixThreadMutex_lock(&(pub->tp_lock)); + for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) { + pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i); + if(pubsubEndpoint_equals(ep, e)) { + arrayList_removeElement(pub->pub_ep_list,ep); + break; + } + } + celixThreadMutex_unlock(&(pub->tp_lock)); + + return CELIX_SUCCESS; +} + +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt pub){ + array_list_pt list = NULL; + celixThreadMutex_lock(&(pub->tp_lock)); + list = arrayList_clone(pub->pub_ep_list); + celixThreadMutex_unlock(&(pub->tp_lock)); + return list; +} + + +static celix_status_t pubsub_topicPublicationGetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { + celix_status_t status = CELIX_SUCCESS; + + topic_publication_pt publish = (topic_publication_pt)handle; + + celixThreadMutex_lock(&(publish->tp_lock)); + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound==NULL){ + bound = pubsub_createPublishBundleBoundService(publish,bundle); + if(bound!=NULL){ + hashMap_put(publish->boundServices,bundle,bound); + } + } + else{ + bound->getCount++; + } + + *service = &bound->service; + + celixThreadMutex_unlock(&(publish->tp_lock)); + + return status; +} + +static celix_status_t pubsub_topicPublicationUngetService(void* handle, bundle_pt bundle, service_registration_pt registration, void **service) { + + topic_publication_pt publish = (topic_publication_pt)handle; + + celixThreadMutex_lock(&(publish->tp_lock)); + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle); + if(bound!=NULL){ + + bound->getCount--; + if(bound->getCount==0){ + pubsub_destroyPublishBundleBoundService(bound); + hashMap_remove(publish->boundServices,bundle); + } + + } + else{ + long bundleId = -1; + bundle_getBundleId(bundle,&bundleId); + printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle %ld.\n", bundleId); + } + + /* service should be never used for unget, so let's set the pointer to NULL */ + *service = NULL; + + celixThreadMutex_unlock(&(publish->tp_lock)); + + return CELIX_SUCCESS; +} + +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool last){ + + bool ret = true; + + zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct pubsub_msg_header)); + if (headerMsg == NULL) ret=false; + zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize); + if (payloadMsg == NULL) ret=false; + + delay_first_send_for_late_joiners(); + + if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; + + if(!last){ + if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false; + } + else{ + if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false; + } + + if (!ret){ + zframe_destroy(&headerMsg); + zframe_destroy(&payloadMsg); + } + + free(msg->header); + free(msg->payload); + free(msg); + + return ret; + +} + +static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt mp_msg_parts){ + + bool ret = true; + + unsigned int i = 0; + unsigned int mp_num = arrayList_size(mp_msg_parts); + for(;i<mp_num;i++){ + ret = ret && send_pubsub_msg(zmq_socket, (pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1)); + } + arrayList_clear(mp_msg_parts); + + return ret; + +} + +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *msg) { + + return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG); + +} + +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int msgTypeId, const void *inMsg, int flags){ + + int status = 0; + + publish_bundle_bound_service_pt bound = (publish_bundle_bound_service_pt) handle; + + celixThreadMutex_lock(&(bound->parent->tp_lock)); + celixThreadMutex_lock(&(bound->mp_lock)); + if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg + printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot process a new one.\n"); + celixThreadMutex_unlock(&(bound->mp_lock)); + celixThreadMutex_unlock(&(bound->parent->tp_lock)); + return -3; + } + + pubsub_msg_serializer_t* msgSer = (pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, (void*)(uintptr_t)msgTypeId); + + if (msgSer!= NULL) { + int major=0, minor=0; + + pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct pubsub_msg_header)); + strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1); + msg_hdr->type = msgTypeId; + + if (msgSer->msgVersion != NULL){ + version_getMajor(msgSer->msgVersion, &major); + version_getMinor(msgSer->msgVersion, &minor); + msg_hdr->major = major; + msg_hdr->minor = minor; + } + + void *serializedOutput = NULL; + size_t serializedOutputLen = 0; + msgSer->serialize(msgSer,inMsg,&serializedOutput, &serializedOutputLen); + + pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg)); + msg->header = msg_hdr; + msg->payload = (char*)serializedOutput; + msg->payloadSize = serializedOutputLen; + bool snd = true; + + switch(flags){ + case PUBSUB_PUBLISHER_FIRST_MSG: + bound->mp_send_in_progress = true; + arrayList_add(bound->mp_parts,msg); + break; + case PUBSUB_PUBLISHER_PART_MSG: + if(!bound->mp_send_in_progress){ + printf("PSA_ZMQ_TP: ERROR: received msg part without the first part.\n"); + status = -4; + } + else{ + arrayList_add(bound->mp_parts,msg); + } + break; + case PUBSUB_PUBLISHER_LAST_MSG: + if(!bound->mp_send_in_progress){ + printf("PSA_ZMQ_TP: ERROR: received end msg without the first part.\n"); + status = -4; + } + else{ + arrayList_add(bound->mp_parts,msg); + snd = send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts); + bound->mp_send_in_progress = false; + } + break; + case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG: //Normal send case + snd = send_pubsub_msg(bound->parent->zmq_socket,msg,true); + break; + default: + printf("PSA_ZMQ_TP: ERROR: Invalid MP flags combination\n"); + status = -4; + break; + } + + if(status==-4){ + free(msg); + } + + if(!snd){ + printf("PSA_ZMQ_TP: Failed to send %s message %u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? "single" : "multipart", msgTypeId); + } + + } else { + printf("PSA_ZMQ_TP: No msg serializer available for msg type id %d\n", msgTypeId); + status=-1; + } + + celixThreadMutex_unlock(&(bound->mp_lock)); + celixThreadMutex_unlock(&(bound->parent->tp_lock)); + + return status; + +} + +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, unsigned int* msgTypeId){ + *msgTypeId = utils_stringHash(msgType); + return 0; +} + + +static unsigned int rand_range(unsigned int min, unsigned int max){ + + double scaled = (double)(((double)random())/((double)RAND_MAX)); + return (max-min+1)*scaled + min; + +} + +static publish_bundle_bound_service_pt pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt bundle){ + + //PRECOND lock on tp->lock + + publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound)); + + if (bound != NULL) { + + bound->parent = tp; + bound->bundle = bundle; + bound->getCount = 1; + bound->mp_send_in_progress = false; + celixThreadMutex_create(&bound->mp_lock,NULL); + + if(tp->serializer != NULL){ + tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes); + } + + arrayList_create(&bound->mp_parts); + + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0); - bound->topic=strdup(pubEP->topic); ++ bound->topic=strdup(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + bound->service.handle = bound; + bound->service.localMsgTypeIdForMsgType = pubsub_localMsgTypeIdForUUID; + bound->service.send = pubsub_topicPublicationSend; + bound->service.sendMultipart = pubsub_topicPublicationSendMultipart; + + } + + return bound; +} + +static void pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt boundSvc){ + + //PRECOND lock on tp->lock + + celixThreadMutex_lock(&boundSvc->mp_lock); + + + if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){ + boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle, boundSvc->msgTypes); + } + + if(boundSvc->mp_parts!=NULL){ + arrayList_destroy(boundSvc->mp_parts); + } + + if(boundSvc->topic!=NULL){ + free(boundSvc->topic); + } + + celixThreadMutex_unlock(&boundSvc->mp_lock); + celixThreadMutex_destroy(&boundSvc->mp_lock); + + free(boundSvc); + +} + +static void delay_first_send_for_late_joiners(){ + + static bool firstSend = true; + + if(firstSend){ + printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n"); + sleep(FIRST_SEND_DELAY); + firstSend = false; + } +} http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/etcd_watcher.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_discovery/src/etcd_watcher.c index 3c3a5a8,0000000..726269a mode 100644,000000..100644 --- a/pubsub/pubsub_discovery/src/etcd_watcher.c +++ b/pubsub/pubsub_discovery/src/etcd_watcher.c @@@ -1,290 -1,0 +1,344 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> ++#include <jansson.h> + +#include "celix_log.h" +#include "constants.h" + +#include "etcd.h" +#include "etcd_watcher.h" + +#include "pubsub_discovery.h" +#include "pubsub_discovery_impl.h" + + + +#define MAX_ROOTNODE_LENGTH 128 +#define MAX_LOCALNODE_LENGTH 4096 +#define MAX_FIELD_LENGTH 128 + +#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH" +#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery" + +#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" +#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" + +#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" +#define DEFAULT_ETCD_SERVER_PORT 2379 + +// be careful - this should be higher than the curl timeout +#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" +#define DEFAULT_ETCD_TTL 30 + + +struct etcd_watcher { + pubsub_discovery_pt pubsub_discovery; + + celix_thread_mutex_t watcherLock; + celix_thread_t watcherThread; + + char *scope; + char *topic; + volatile bool running; +}; + +struct etcd_writer { + pubsub_discovery_pt pubsub_discovery; + celix_thread_mutex_t localPubsLock; + array_list_pt localPubs; + volatile bool running; + celix_thread_t writerThread; +}; + + +// note that the rootNode shouldn't have a leading slash +static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, const char *scope, const char *topic, char* rootNode, int rootNodeLen) { + celix_status_t status = CELIX_SUCCESS; + const char* rootPath = NULL; + + if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { + snprintf(rootNode, rootNodeLen, "%s/%s/%s", DEFAULT_ETCD_ROOTPATH, scope, topic); + } else { + snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, topic); + } + + return status; +} + +static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, char* rootNode) { + celix_status_t status = CELIX_SUCCESS; + const char* rootPath = NULL; + + if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) { + strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH); + } else { + strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH); + } + + return status; +} + + +static void add_node(const char *key, const char *value, void* arg) { + pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg; + pubsub_endpoint_pt pubEP = NULL; + celix_status_t status = etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP); + if(!status && pubEP) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } +} + +static celix_status_t etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, const char *rootPath, long long * highestModified) { + celix_status_t status = CELIX_SUCCESS; + if(etcd_get_directory(rootPath, add_node, ps_discovery, highestModified)) { + status = CELIX_ILLEGAL_ARGUMENT; + } + return status; +} + +// gets everything from provided key +celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt pubsub_discovery, const char* etcdKey, const char* etcdValue, pubsub_endpoint_pt* pubEP) { + + celix_status_t status = CELIX_SUCCESS; + + char rootPath[MAX_ROOTNODE_LENGTH]; + char *expr = NULL; + char scope[MAX_FIELD_LENGTH]; + char topic[MAX_FIELD_LENGTH]; + char fwUUID[MAX_FIELD_LENGTH]; + char serviceId[MAX_FIELD_LENGTH]; + + memset(rootPath,0,MAX_ROOTNODE_LENGTH); + memset(topic,0,MAX_FIELD_LENGTH); + memset(fwUUID,0,MAX_FIELD_LENGTH); + memset(serviceId,0,MAX_FIELD_LENGTH); + + etcdWatcher_getRootPath(pubsub_discovery->context, rootPath); + + asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath); + if(expr) { + int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, serviceId); + free(expr); + if (foundItems != 4) { // Could happen when a directory is removed, just don't process this. + status = CELIX_ILLEGAL_STATE; + } + else{ - status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP); ++ ++ // etcdValue contains the json formatted string ++ json_error_t error; ++ json_t* jsonRoot = json_loads(etcdValue, JSON_DECODE_ANY, &error); ++ ++ const char* endpoint_serializer = NULL; ++ const char* endpoint_admin_type = NULL; ++ const char* endpoint_url = NULL; ++ const char* endpoint_type = NULL; ++ ++ if (json_is_object(jsonRoot)){ ++ ++ void *iter = json_object_iter(jsonRoot); ++ ++ const char *key; ++ json_t *value; ++ ++ while (iter) { ++ key = json_object_iter_key(iter); ++ value = json_object_iter_value(iter); ++ ++ if (strcmp(key, PUBSUB_ENDPOINT_SERIALIZER) == 0) { ++ endpoint_serializer = json_string_value(value); ++ } else if (strcmp(key, PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) { ++ endpoint_admin_type = json_string_value(value); ++ } else if (strcmp(key, PUBSUB_ENDPOINT_URL) == 0) { ++ endpoint_url = json_string_value(value); ++ } else if (strcmp(key, PUBSUB_ENDPOINT_TYPE) == 0) { ++ endpoint_type = json_string_value(value); ++ } ++ ++ iter = json_object_iter_next(jsonRoot, iter); ++ } ++ ++ if (endpoint_url == NULL) { ++ printf("EW: No endpoint found in json object!\n"); ++ endpoint_url = etcdValue; ++ } ++ ++ } else { ++ endpoint_url = etcdValue; ++ } ++ ++ status = pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP); ++ ++ if (status == CELIX_SUCCESS) { ++ status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer); ++ status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type); ++ status += pubsubEndpoint_setField(*pubEP, PUBSUB_ENDPOINT_TYPE, endpoint_type); ++ } ++ ++ if (jsonRoot != NULL) { ++ json_decref(jsonRoot); ++ } + } + } + return status; +} + +/* + * performs (blocking) etcd_watch calls to check for + * changing discovery endpoint information within etcd. + */ +static void* etcdWatcher_run(void* data) { + etcd_watcher_pt watcher = (etcd_watcher_pt) data; + time_t timeBeforeWatch = time(NULL); + char rootPath[MAX_ROOTNODE_LENGTH]; + long long highestModified = 0; + + pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery; + bundle_context_pt context = ps_discovery->context; + + memset(rootPath, 0, MAX_ROOTNODE_LENGTH); + + //TODO: add topic to etcd key + etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); + etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, &highestModified); + + while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) && watcher->running) { + + char *rkey = NULL; + char *value = NULL; + char *preValue = NULL; + char *action = NULL; + long long modIndex; + + celixThreadMutex_unlock(&watcher->watcherLock); + + if (etcd_watch(rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) { + pubsub_endpoint_pt pubEP = NULL; + if ((strcmp(action, "set") == 0) || (strcmp(action, "create") == 0)) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "delete") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_removeNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "expire") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_removeNode(ps_discovery, pubEP); + } + } else if (strcmp(action, "update") == 0) { + if (etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == CELIX_SUCCESS) { + pubsub_discovery_addNode(ps_discovery, pubEP); + } + } else { + fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, "Unexpected action: %s", action); + } + highestModified = modIndex; + } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 4)) { + sleep(DEFAULT_ETCD_TTL / 4); + } + + FREE_MEM(action); + FREE_MEM(value); + FREE_MEM(preValue); + FREE_MEM(rkey); + + /* prevent busy waiting, in case etcd_watch returns false */ + + + if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) { + timeBeforeWatch = time(NULL); + } + + } + + if (watcher->running == false) { + celixThreadMutex_unlock(&watcher->watcherLock); + } + + return NULL; +} + +celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, bundle_context_pt context, const char *scope, const char *topic, etcd_watcher_pt *watcher) { + celix_status_t status = CELIX_SUCCESS; + + + if (pubsub_discovery == NULL) { + return CELIX_BUNDLE_EXCEPTION; + } + + (*watcher) = calloc(1, sizeof(struct etcd_watcher)); + + if(*watcher == NULL){ + return CELIX_ENOMEM; + } + + (*watcher)->pubsub_discovery = pubsub_discovery; + (*watcher)->scope = strdup(scope); + (*watcher)->topic = strdup(topic); + + + celixThreadMutex_create(&(*watcher)->watcherLock, NULL); + + celixThreadMutex_lock(&(*watcher)->watcherLock); + + status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher); + if (status == CELIX_SUCCESS) { + (*watcher)->running = true; + } + + celixThreadMutex_unlock(&(*watcher)->watcherLock); + + + return status; +} + +celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) { + + celix_status_t status = CELIX_SUCCESS; + + char rootPath[MAX_ROOTNODE_LENGTH]; + etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH); + celixThreadMutex_destroy(&(watcher->watcherLock)); + + free(watcher->scope); + free(watcher->topic); + free(watcher); + + return status; +} + +celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){ + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&(watcher->watcherLock)); + watcher->running = false; + celixThreadMutex_unlock(&(watcher->watcherLock)); + + celixThread_join(watcher->watcherThread, NULL); + + return status; + +} + + http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/etcd_writer.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_discovery/src/etcd_writer.c index 1c423f3,0000000..e820e50 mode 100644,000000..100644 --- a/pubsub/pubsub_discovery/src/etcd_writer.c +++ b/pubsub/pubsub_discovery/src/etcd_writer.c @@@ -1,189 -1,0 +1,221 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdbool.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> ++#include <jansson.h> + +#include "celix_log.h" +#include "constants.h" + +#include "etcd.h" +#include "etcd_writer.h" + +#include "pubsub_discovery.h" +#include "pubsub_discovery_impl.h" + +#define MAX_ROOTNODE_LENGTH 128 + +#define CFG_ETCD_ROOT_PATH "PUBSUB_DISCOVERY_ETCD_ROOT_PATH" +#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery" + +#define CFG_ETCD_SERVER_IP "PUBSUB_DISCOVERY_ETCD_SERVER_IP" +#define DEFAULT_ETCD_SERVER_IP "127.0.0.1" + +#define CFG_ETCD_SERVER_PORT "PUBSUB_DISCOVERY_ETCD_SERVER_PORT" +#define DEFAULT_ETCD_SERVER_PORT 2379 + +// be careful - this should be higher than the curl timeout +#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL" +#define DEFAULT_ETCD_TTL 30 + +struct etcd_writer { + pubsub_discovery_pt pubsub_discovery; + celix_thread_mutex_t localPubsLock; + array_list_pt localPubs; + volatile bool running; + celix_thread_t writerThread; +}; + + +static const char* etcdWriter_getRootPath(bundle_context_pt context); +static void* etcdWriter_run(void* data); + + +etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) { + etcd_writer_pt writer = calloc(1, sizeof(*writer)); + if(writer) { + celixThreadMutex_create(&writer->localPubsLock, NULL); + arrayList_create(&writer->localPubs); + writer->pubsub_discovery = disc; + writer->running = true; + celixThread_create(&writer->writerThread, NULL, etcdWriter_run, writer); + } + return writer; +} + +void etcdWriter_destroy(etcd_writer_pt writer) { + char dir[MAX_ROOTNODE_LENGTH]; + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + + writer->running = false; + celixThread_join(writer->writerThread, NULL); + + celixThreadMutex_lock(&writer->localPubsLock); + for(int i = 0; i < arrayList_size(writer->localPubs); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(writer->localPubs,i); + memset(dir,0,MAX_ROOTNODE_LENGTH); - snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID); ++ snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s", ++ rootPath, ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID)); ++ + etcd_del(dir); + pubsubEndpoint_destroy(pubEP); + } + arrayList_destroy(writer->localPubs); + + celixThreadMutex_unlock(&writer->localPubsLock); + celixThreadMutex_destroy(&(writer->localPubsLock)); + + free(writer); +} + +celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP, bool storeEP){ + celix_status_t status = CELIX_BUNDLE_EXCEPTION; + + if(storeEP){ + const char *fwUUID = NULL; + bundleContext_getProperty(writer->pubsub_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); - if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) { ++ if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) { + celixThreadMutex_lock(&writer->localPubsLock); + pubsub_endpoint_pt p = NULL; + pubsubEndpoint_clone(pubEP, &p); + arrayList_add(writer->localPubs,p); + celixThreadMutex_unlock(&writer->localPubsLock); + } + } + + char *key; + + const char* ttlStr = NULL; + int ttl = 0; + + // determine ttl + if ((bundleContext_getProperty(writer->pubsub_discovery->context, CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) { + ttl = DEFAULT_ETCD_TTL; + } else { + char* endptr = NULL; + errno = 0; + ttl = strtol(ttlStr, &endptr, 10); + if (*endptr || errno != 0) { + ttl = DEFAULT_ETCD_TTL; + } + } + + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + - asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID); - - if(!etcd_set(key,pubEP->endpoint,ttl,false)){ ++ asprintf(&key,"%s/%s/%s/%s/%ld", ++ rootPath, ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ pubEP->serviceID); ++ ++ char serviceID [sizeof(pubEP->serviceID)]; ++ snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID); ++ json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}", ++ PUBSUB_ENDPOINT_SERVICE_ID, serviceID, ++ PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) stored in endpoint ++ PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint ++ PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL), ++ PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary ++ PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE) ++ ); ++ char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT); ++ ++ if (!etcd_set(key,jsonEndpointStr,ttl,false)) { + status = CELIX_ILLEGAL_ARGUMENT; + } + FREE_MEM(key); ++ FREE_MEM(jsonEndpointStr); ++ json_decref(jsonEndpoint); ++ + return status; +} + +celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + char *key = NULL; + + const char *rootPath = etcdWriter_getRootPath(writer->pubsub_discovery->context); + - asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, pubEP->frameworkUUID, pubEP->serviceID); ++ asprintf(&key, "%s/%s/%s/%s/%ld", ++ rootPath, ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ pubEP->serviceID); + + celixThreadMutex_lock(&writer->localPubsLock); + for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) { + pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i); + if (pubsubEndpoint_equals(ep, pubEP)) { + arrayList_remove(writer->localPubs, i); + pubsubEndpoint_destroy(ep); + break; + } + } + celixThreadMutex_unlock(&writer->localPubsLock); + + if (etcd_del(key)) { + printf("Failed to remove key %s from ETCD\n",key); + status = CELIX_ILLEGAL_ARGUMENT; + } + FREE_MEM(key); + return status; +} + +static void* etcdWriter_run(void* data) { + etcd_writer_pt writer = (etcd_writer_pt)data; + while(writer->running) { + celixThreadMutex_lock(&writer->localPubsLock); + for(int i=0; i < arrayList_size(writer->localPubs); i++) { + etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false); + } + celixThreadMutex_unlock(&writer->localPubsLock); + sleep(DEFAULT_ETCD_TTL / 2); + } + + return NULL; +} + +static const char* etcdWriter_getRootPath(bundle_context_pt context) { + const char* rootPath = NULL; + bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath); + if(rootPath == NULL) { + rootPath = DEFAULT_ETCD_ROOTPATH; + } + return rootPath; +} +