http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
index 2dcec25,0000000..9929437
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
+++ b/pubsub/pubsub_admin_zmq/src/pubsub_admin_impl.c
@@@ -1,1040 -1,0 +1,1059 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +/*
 + * pubsub_admin_impl.c
 + *
 + *  \date       Sep 30, 2011
 + *  \author           <a href="mailto:d...@celix.apache.org";>Apache Celix 
Project Team</a>
 + *  \copyright        Apache License, Version 2.0
 + */
 +
 +#include "pubsub_admin_impl.h"
 +#include <zmq.h>
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +
 +#include <arpa/inet.h>
 +#include <sys/socket.h>
 +#include <netdb.h>
 +
 +#ifndef ANDROID
 +#include <ifaddrs.h>
 +#endif
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
 +
 +#include "constants.h"
 +#include "utils.h"
 +#include "hash_map.h"
 +#include "array_list.h"
 +#include "bundle_context.h"
 +#include "bundle.h"
 +#include "service_reference.h"
 +#include "service_registration.h"
 +#include "log_helper.h"
 +#include "log_service.h"
 +#include "celix_threads.h"
 +#include "service_factory.h"
 +
 +#include "topic_subscription.h"
 +#include "topic_publication.h"
 +#include "pubsub_endpoint.h"
 +#include "pubsub_utils.h"
 +#include "pubsub/subscriber.h"
 +
 +#define MAX_KEY_FOLDER_PATH_LENGTH 512
 +
 +static const char *DEFAULT_IP = "127.0.0.1";
 +
 +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip);
 +static celix_status_t 
pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP);
 +
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc);
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication);
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication);
 +
 +celix_status_t pubsubAdmin_create(bundle_context_pt context, pubsub_admin_pt 
*admin) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      if (!zsys_has_curve()){
 +              printf("PSA_ZMQ: zeromq curve unsupported\n");
 +              return CELIX_SERVICE_EXCEPTION;
 +      }
 +#endif
 +
 +      *admin = calloc(1, sizeof(**admin));
 +
 +      if (!*admin) {
 +              status = CELIX_ENOMEM;
 +      }
 +      else{
 +
 +              const char *ip = NULL;
 +              char *detectedIp = NULL;
 +              (*admin)->bundle_context= context;
 +              (*admin)->localPublications = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
 +              (*admin)->subscriptions = hashMap_create(utils_stringHash, 
NULL, utils_stringEquals, NULL);
 +              (*admin)->pendingSubscriptions = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +              (*admin)->externalPublications = 
hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 +              (*admin)->topicSubscriptionsPerSerializer = 
hashMap_create(NULL, NULL, NULL, NULL);
 +              (*admin)->topicPublicationsPerSerializer  = 
hashMap_create(NULL, NULL, NULL, NULL);
 +              arrayList_create(&((*admin)->noSerializerSubscriptions));
 +              arrayList_create(&((*admin)->noSerializerPublications));
 +              arrayList_create(&((*admin)->serializerList));
 +
 +              celixThreadMutex_create(&(*admin)->localPublicationsLock, NULL);
 +              celixThreadMutex_create(&(*admin)->subscriptionsLock, NULL);
 +              celixThreadMutex_create(&(*admin)->externalPublicationsLock, 
NULL);
 +              celixThreadMutex_create(&(*admin)->serializerListLock, NULL);
 +              celixThreadMutex_create(&(*admin)->usedSerializersLock, NULL);
 +
 +              
celixThreadMutexAttr_create(&(*admin)->noSerializerPendingsAttr);
 +              
celixThreadMutexAttr_settype(&(*admin)->noSerializerPendingsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
 +              celixThreadMutex_create(&(*admin)->noSerializerPendingsLock, 
&(*admin)->noSerializerPendingsAttr);
 +
 +              
celixThreadMutexAttr_create(&(*admin)->pendingSubscriptionsAttr);
 +              
celixThreadMutexAttr_settype(&(*admin)->pendingSubscriptionsAttr, 
CELIX_THREAD_MUTEX_RECURSIVE);
 +              celixThreadMutex_create(&(*admin)->pendingSubscriptionsLock, 
&(*admin)->pendingSubscriptionsAttr);
 +
 +              if (logHelper_create(context, &(*admin)->loghelper) == 
CELIX_SUCCESS) {
 +                      logHelper_start((*admin)->loghelper);
 +              }
 +
 +              bundleContext_getProperty(context,PSA_IP , &ip);
 +
 +#ifndef ANDROID
 +              if (ip == NULL) {
 +                      const char *interface = NULL;
 +
 +                      bundleContext_getProperty(context, PSA_ITF, &interface);
 +                      if (pubsubAdmin_getIpAdress(interface, &detectedIp) != 
CELIX_SUCCESS) {
 +                              logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: Could not retrieve IP adress for interface 
%s", interface);
 +                      }
 +
 +                      ip = detectedIp;
 +              }
 +#endif
 +
 +              if (ip != NULL) {
 +                      logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %s for service annunciation", ip);
 +                      (*admin)->ipAddress = strdup(ip);
 +              }
 +              else {
 +                      logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_WARNING, "PSA_ZMQ: No IP address for service annunciation set. 
Using %s", DEFAULT_IP);
 +                      (*admin)->ipAddress = strdup(DEFAULT_IP);
 +              }
 +
 +              if (detectedIp != NULL) {
 +                      free(detectedIp);
 +              }
 +
 +              const char* basePortStr = NULL;
 +              const char* maxPortStr = NULL;
 +              char* endptrBase = NULL;
 +              char* endptrMax = NULL;
 +              bundleContext_getPropertyWithDefault(context, 
PSA_ZMQ_BASE_PORT, "PSA_ZMQ_DEFAULT_BASE_PORT", &basePortStr);
 +              bundleContext_getPropertyWithDefault(context, PSA_ZMQ_MAX_PORT, 
"PSA_ZMQ_DEFAULT_MAX_PORT", &maxPortStr);
 +              (*admin)->basePort = strtol(basePortStr, &endptrBase, 10);
 +              (*admin)->maxPort = strtol(maxPortStr, &endptrMax, 10);
 +              if (*endptrBase != '\0') {
 +                      (*admin)->basePort = PSA_ZMQ_DEFAULT_BASE_PORT;
 +              }
 +              if (*endptrMax != '\0') {
 +                      (*admin)->maxPort = PSA_ZMQ_DEFAULT_MAX_PORT;
 +              }
 +
 +              printf("PSA Using base port %u to max port %u\n", 
(*admin)->basePort, (*admin)->maxPort);
 +
 +              // Disable Signal Handling by CZMQ
 +              setenv("ZSYS_SIGHANDLER", "false", true);
 +
 +              const char *nrZmqThreads = NULL;
 +              bundleContext_getProperty(context, "PSA_NR_ZMQ_THREADS", 
&nrZmqThreads);
 +
 +              if(nrZmqThreads != NULL) {
 +                      char *endPtr = NULL;
 +                      unsigned int nrThreads = strtoul(nrZmqThreads, &endPtr, 
10);
 +                      if(endPtr != nrZmqThreads && nrThreads > 0 && nrThreads 
< 50) {
 +                              zsys_set_io_threads(nrThreads);
 +                              logHelper_log((*admin)->loghelper, 
OSGI_LOGSERVICE_INFO, "PSA_ZMQ: Using %d threads for ZMQ", nrThreads);
 +                              printf("PSA_ZMQ: Using %d threads for ZMQ\n", 
nrThreads);
 +                      }
 +              }
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +              // Setup authenticator
 +              zactor_t* auth = zactor_new (zauth, NULL);
 +              zstr_sendx(auth, "VERBOSE", NULL);
 +
 +              // Load all public keys of subscribers into the application
 +              // This step is done for authenticating subscribers
 +              char curve_folder_path[MAX_KEY_FOLDER_PATH_LENGTH];
 +              char* keys_bundle_dir = pubsub_getKeysBundleDir(context);
 +              snprintf(curve_folder_path, MAX_KEY_FOLDER_PATH_LENGTH, 
"%s/META-INF/keys/subscriber/public", keys_bundle_dir);
 +              zstr_sendx (auth, "CURVE", curve_folder_path, NULL);
 +              free(keys_bundle_dir);
 +
 +              (*admin)->zmq_auth = auth;
 +#endif
 +
 +      }
 +
 +      return status;
 +}
 +
 +
 +celix_status_t pubsubAdmin_destroy(pubsub_admin_pt admin)
 +{
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      free(admin->ipAddress);
 +
 +      celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +      hash_map_iterator_pt iter = 
hashMapIterator_create(admin->pendingSubscriptions);
 +      while(hashMapIterator_hasNext(iter)){
 +              hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +              free((char*)hashMapEntry_getKey(entry));
 +              arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(admin->pendingSubscriptions,false,false);
 +      celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +      hashMap_destroy(admin->subscriptions,false,false);
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +      celixThreadMutex_lock(&admin->localPublicationsLock);
 +      hashMap_destroy(admin->localPublications,true,false);
 +      celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +      celixThreadMutex_lock(&admin->externalPublicationsLock);
 +      iter = hashMapIterator_create(admin->externalPublications);
 +      while(hashMapIterator_hasNext(iter)){
 +              hash_map_entry_pt entry = hashMapIterator_nextEntry(iter);
 +              free((char*)hashMapEntry_getKey(entry));
 +              arrayList_destroy((array_list_pt)hashMapEntry_getValue(entry));
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(admin->externalPublications,false,false);
 +      celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +      celixThreadMutex_lock(&admin->serializerListLock);
 +      arrayList_destroy(admin->serializerList);
 +      celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +      celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +      arrayList_destroy(admin->noSerializerSubscriptions);
 +      arrayList_destroy(admin->noSerializerPublications);
 +      celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +
 +      celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +      iter = hashMapIterator_create(admin->topicSubscriptionsPerSerializer);
 +      while(hashMapIterator_hasNext(iter)){
 +              
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(admin->topicSubscriptionsPerSerializer,false,false);
 +
 +      iter = hashMapIterator_create(admin->topicPublicationsPerSerializer);
 +      while(hashMapIterator_hasNext(iter)){
 +              
arrayList_destroy((array_list_pt)hashMapIterator_nextValue(iter));
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(admin->topicPublicationsPerSerializer,false,false);
 +
 +      celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +      celixThreadMutex_destroy(&admin->usedSerializersLock);
 +      celixThreadMutex_destroy(&admin->serializerListLock);
 +      celixThreadMutex_destroy(&admin->pendingSubscriptionsLock);
 +
 +      celixThreadMutexAttr_destroy(&admin->noSerializerPendingsAttr);
 +      celixThreadMutex_destroy(&admin->noSerializerPendingsLock);
 +
 +      celixThreadMutexAttr_destroy(&admin->pendingSubscriptionsAttr);
 +      celixThreadMutex_destroy(&admin->subscriptionsLock);
 +
 +      celixThreadMutex_destroy(&admin->localPublicationsLock);
 +      celixThreadMutex_destroy(&admin->externalPublicationsLock);
 +
 +      logHelper_stop(admin->loghelper);
 +
 +      logHelper_destroy(&admin->loghelper);
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      if (admin->zmq_auth != NULL){
 +              zactor_destroy(&(admin->zmq_auth));
 +      }
 +#endif
 +
 +      free(admin);
 +
 +      return status;
 +}
 +
 +static celix_status_t pubsubAdmin_addAnySubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +      topic_subscription_pt any_sub = 
hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
 +
 +      if(any_sub==NULL){
 +
 +              int i;
 +              pubsub_serializer_service_t *best_serializer = NULL;
 +              if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
 +                      status = 
pubsub_topicSubscriptionCreate(admin->bundle_context, 
PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, PUBSUB_ANY_SUB_TOPIC, best_serializer, 
&any_sub);
 +              }
 +              else{
-                       printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
++                      printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",
++                                 properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +                      celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +                      arrayList_add(admin->noSerializerSubscriptions,subEP);
 +                      
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +              }
 +
 +              if (status == CELIX_SUCCESS){
 +
 +                      /* Connect all internal publishers */
 +                      celixThreadMutex_lock(&admin->localPublicationsLock);
 +                      hash_map_iterator_pt lp_iter 
=hashMapIterator_create(admin->localPublications);
 +                      while(hashMapIterator_hasNext(lp_iter)){
 +                              service_factory_pt factory = 
(service_factory_pt)hashMapIterator_nextValue(lp_iter);
 +                              topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
 +                              array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +                              if(topic_publishers!=NULL){
 +                                      
for(i=0;i<arrayList_size(topic_publishers);i++){
 +                                              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
++                                              
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++                                                      status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +                                              }
 +                                      }
 +                                      arrayList_destroy(topic_publishers);
 +                              }
 +                      }
 +                      hashMapIterator_destroy(lp_iter);
 +                      celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +                      /* Connect also all external publishers */
 +                      celixThreadMutex_lock(&admin->externalPublicationsLock);
 +                      hash_map_iterator_pt extp_iter 
=hashMapIterator_create(admin->externalPublications);
 +                      while(hashMapIterator_hasNext(extp_iter)){
 +                              array_list_pt ext_pub_list = 
(array_list_pt)hashMapIterator_nextValue(extp_iter);
 +                              if(ext_pub_list!=NULL){
 +                                      
for(i=0;i<arrayList_size(ext_pub_list);i++){
 +                                              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(any_sub,pubEP->endpoint);
++                                              
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++                                                      status += 
pubsub_topicSubscriptionConnectPublisher(any_sub, (char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +                                              }
 +                                      }
 +                              }
 +                      }
 +                      hashMapIterator_destroy(extp_iter);
 +                      
celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +
 +
 +                      pubsub_topicSubscriptionAddSubscriber(any_sub,subEP);
 +
 +                      status += pubsub_topicSubscriptionStart(any_sub);
 +
 +              }
 +
 +              if (status == CELIX_SUCCESS){
 +                      
hashMap_put(admin->subscriptions,strdup(PUBSUB_ANY_SUB_TOPIC),any_sub);
 +                      connectTopicPubSubToSerializer(admin, best_serializer, 
any_sub, false);
 +              }
 +
 +      }
 +
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsubAdmin_addSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
 +      celix_status_t status = CELIX_SUCCESS;
 +
-       printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->scope, 
subEP->topic);
++      printf("PSA_ZMQ: Received subscription [FWUUID=%s bundleID=%ld 
scope=%s, topic=%s]\n",
++                 properties_get(subEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                 subEP->serviceID,
++                 properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++                 properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
-       if(strcmp(subEP->topic,PUBSUB_ANY_SUB_TOPIC)==0){
++      if(strcmp(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),PUBSUB_ANY_SUB_TOPIC)==0){
 +              return pubsubAdmin_addAnySubscription(admin,subEP);
 +      }
 +
 +      /* Check if we already know some publisher about this topic, otherwise 
let's put the subscription in the pending hashmap */
 +      celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +      celixThreadMutex_lock(&admin->localPublicationsLock);
 +      celixThreadMutex_lock(&admin->externalPublicationsLock);
 +
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++      char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
 +      service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +      array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +
 +      if(factory==NULL && ext_pub_list==NULL){ //No (local or external) 
publishers yet for this topic
 +              pubsubAdmin_addSubscriptionToPendingList(admin,subEP);
 +      }
 +      else{
 +              int i;
 +              topic_subscription_pt subscription = 
hashMap_get(admin->subscriptions, scope_topic);
 +
 +              if(subscription == NULL) {
 +                      pubsub_serializer_service_t *best_serializer = NULL;
 +                      if( (status=pubsubAdmin_getBestSerializer(admin, subEP, 
&best_serializer)) == CELIX_SUCCESS){
-                               status += 
pubsub_topicSubscriptionCreate(admin->bundle_context,subEP->scope, 
subEP->topic, best_serializer, &subscription);
++                              status += 
pubsub_topicSubscriptionCreate(admin->bundle_context, (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) 
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), best_serializer, 
&subscription);
 +                      }
 +                      else{
-                               printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",subEP->topic);
++                              printf("PSA_ZMQ: Cannot find a serializer for 
subscribing topic %s. Adding it to pending list.\n",
++                                         
properties_get(subEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                              
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +                              
arrayList_add(admin->noSerializerSubscriptions,subEP);
 +                              
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +                      }
 +
 +                      if (status==CELIX_SUCCESS){
 +
 +                              /* Try to connect internal publishers */
 +                              if(factory!=NULL){
 +                                      topic_publication_pt topic_pubs = 
(topic_publication_pt)factory->handle;
 +                                      array_list_pt topic_publishers = 
pubsub_topicPublicationGetPublisherList(topic_pubs);
 +
 +                                      if(topic_publishers!=NULL){
 +                                              
for(i=0;i<arrayList_size(topic_publishers);i++){
 +                                                      pubsub_endpoint_pt 
pubEP = (pubsub_endpoint_pt)arrayList_get(topic_publishers,i);
-                                                       if(pubEP->endpoint 
!=NULL){
-                                                               status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
++                                                      
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++                                                              status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
 +                                                      }
 +                                              }
 +                                              
arrayList_destroy(topic_publishers);
 +                                      }
 +
 +                              }
 +
 +                              /* Look also for external publishers */
 +                              if(ext_pub_list!=NULL){
 +                                      
for(i=0;i<arrayList_size(ext_pub_list);i++){
 +                                              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                                               if(pubEP->endpoint !=NULL){
-                                                       status += 
pubsub_topicSubscriptionConnectPublisher(subscription,pubEP->endpoint);
++                                              
if(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL) !=NULL){
++                                                      status += 
pubsub_topicSubscriptionConnectPublisher(subscription,(char*) 
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +                                              }
 +                                      }
 +                              }
 +
 +                              
pubsub_topicSubscriptionAddSubscriber(subscription,subEP);
 +
 +                              status += 
pubsub_topicSubscriptionStart(subscription);
 +
 +                      }
 +
 +                      if(status==CELIX_SUCCESS){
 +
 +                              
hashMap_put(admin->subscriptions,strdup(scope_topic),subscription);
 +
 +                              connectTopicPubSubToSerializer(admin, 
best_serializer, subscription, false);
 +                      }
 +              }
 +
 +              if (status == CELIX_SUCCESS){
 +                      pubsub_topicIncreaseNrSubscribers(subscription);
 +              }
 +      }
 +
 +      free(scope_topic);
 +      celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +      celixThreadMutex_unlock(&admin->localPublicationsLock);
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +      celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removeSubscription(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
 +      celix_status_t status = CELIX_SUCCESS;
 +
-       printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",subEP->frameworkUUID,subEP->serviceID,subEP->topic);
++      printf("PSA_ZMQ: Removing subscription [FWUUID=%s bundleID=%ld 
topic=%s]\n",
++                 properties_get(subEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                 subEP->serviceID,
++                 properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++      char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +      topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
 +      if(sub!=NULL){
 +              pubsub_topicDecreaseNrSubscribers(sub);
 +              if(pubsub_topicGetNrSubscribers(sub) == 0) {
 +                      status = 
pubsub_topicSubscriptionRemoveSubscriber(sub,subEP);
 +              }
 +      }
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +      if(sub==NULL){
 +              /* Maybe the endpoint was pending */
 +              celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +              if(!arrayList_removeElement(admin->noSerializerSubscriptions, 
subEP)){
 +                      status = CELIX_ILLEGAL_STATE;
 +              }
 +              celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +      }
 +
 +      free(scope_topic);
 +
 +
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_addPublication(pubsub_admin_pt admin, 
pubsub_endpoint_pt pubEP) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
-       printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n", pubEP->frameworkUUID, pubEP->serviceID, pubEP->scope, 
pubEP->topic);
++      printf("PSA_ZMQ: Received publication [FWUUID=%s bundleID=%ld scope=%s, 
topic=%s]\n",
++                 properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                 pubEP->serviceID,
++                 properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++                 properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
 +      const char* fwUUID = NULL;
 +
 +      bundleContext_getProperty(admin->bundle_context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
 +      if (fwUUID == NULL) {
 +              printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
 +              return CELIX_INVALID_BUNDLE_CONTEXT;
 +      }
 +
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
++      char *scope_topic = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
-       if ((strcmp(pubEP->frameworkUUID, fwUUID) == 0) && (pubEP->endpoint == 
NULL)) {
++      if ((strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) &&
++                      (properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) == NULL)) {
 +
 +              celixThreadMutex_lock(&admin->localPublicationsLock);
 +
 +              service_factory_pt factory = (service_factory_pt) 
hashMap_get(admin->localPublications, scope_topic);
 +
 +              if (factory == NULL) {
 +                      topic_publication_pt pub = NULL;
 +                      pubsub_serializer_service_t *best_serializer = NULL;
 +                      if( (status=pubsubAdmin_getBestSerializer(admin, pubEP, 
&best_serializer)) == CELIX_SUCCESS){
 +                              status = 
pubsub_topicPublicationCreate(admin->bundle_context, pubEP, best_serializer, 
admin->ipAddress, admin->basePort, admin->maxPort, &pub);
 +                      }
 +                      else{
-                               printf("PSA_ZMQ: Cannot find a serializer for 
publishing topic %s. Adding it to pending list.\n", pubEP->topic);
++                              printf("PSA_ZMQ: Cannot find a serializer for 
publishing topic %s. Adding it to pending list.\n",
++                                         
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC));
 +                              
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +                              
arrayList_add(admin->noSerializerPublications,pubEP);
 +                              
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +                      }
 +
 +                      if (status == CELIX_SUCCESS) {
 +                              status = 
pubsub_topicPublicationStart(admin->bundle_context, pub, &factory);
 +                              if (status == CELIX_SUCCESS && factory != NULL) 
{
 +                                      hashMap_put(admin->localPublications, 
strdup(scope_topic), factory);
 +                                      connectTopicPubSubToSerializer(admin, 
best_serializer, pub, true);
 +                              }
 +                      } else {
-                               printf("PSA_ZMQ: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n", pubEP->scope, 
pubEP->topic, pubEP->serviceID);
++                              printf("PSA_ZMQ: Cannot create a 
topicPublication for scope=%s, topic=%s (bundle %ld).\n",
++                                         
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),
++                                         
properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC),
++                                         pubEP->serviceID);
 +                      }
 +              } else {
 +                      //just add the new EP to the list
 +                      topic_publication_pt pub = (topic_publication_pt) 
factory->handle;
 +                      pubsub_topicPublicationAddPublisherEP(pub, pubEP);
 +              }
 +
 +              celixThreadMutex_unlock(&admin->localPublicationsLock);
 +      }
 +      else{
 +
 +              celixThreadMutex_lock(&admin->externalPublicationsLock);
 +              array_list_pt ext_pub_list = (array_list_pt) 
hashMap_get(admin->externalPublications, scope_topic);
 +              if (ext_pub_list == NULL) {
 +                      arrayList_create(&ext_pub_list);
 +                      hashMap_put(admin->externalPublications, 
strdup(scope_topic), ext_pub_list);
 +              }
 +
 +              arrayList_add(ext_pub_list, pubEP);
 +
 +              celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +      }
 +
 +      /* Re-evaluate the pending subscriptions */
 +      celixThreadMutex_lock(&admin->pendingSubscriptionsLock);
 +
 +      hash_map_entry_pt pendingSub = 
hashMap_getEntry(admin->pendingSubscriptions, scope_topic);
 +      if (pendingSub != NULL) { //There were pending subscription for the 
just published topic. Let's connect them.
 +              char* topic = (char*) hashMapEntry_getKey(pendingSub);
 +              array_list_pt pendingSubList = (array_list_pt) 
hashMapEntry_getValue(pendingSub);
 +              int i;
 +              for (i = 0; i < arrayList_size(pendingSubList); i++) {
 +                      pubsub_endpoint_pt subEP = (pubsub_endpoint_pt) 
arrayList_get(pendingSubList, i);
 +                      pubsubAdmin_addSubscription(admin, subEP);
 +              }
 +              hashMap_remove(admin->pendingSubscriptions, scope_topic);
 +              arrayList_clear(pendingSubList);
 +              arrayList_destroy(pendingSubList);
 +              free(topic);
 +      }
 +
 +      celixThreadMutex_unlock(&admin->pendingSubscriptionsLock);
 +
 +      /* Connect the new publisher to the subscription for his topic, if 
there is any */
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +      topic_subscription_pt sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, scope_topic);
-       if (sub != NULL && pubEP->endpoint != NULL) {
-               pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
pubEP->endpoint);
++      if (sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
++              pubsub_topicSubscriptionAddConnectPublisherToPendingList(sub, 
(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +      }
 +
 +      /* And check also for ANY subscription */
 +      topic_subscription_pt any_sub = (topic_subscription_pt) 
hashMap_get(admin->subscriptions, PUBSUB_ANY_SUB_TOPIC);
-       if (any_sub != NULL && pubEP->endpoint != NULL) {
-               
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
pubEP->endpoint);
++      if (any_sub != NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL) != NULL) {
++              
pubsub_topicSubscriptionAddConnectPublisherToPendingList(any_sub, 
(char*)properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL));
 +      }
 +
 +      free(scope_topic);
 +
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_removePublication(pubsub_admin_pt 
admin,pubsub_endpoint_pt pubEP){
 +      celix_status_t status = CELIX_SUCCESS;
 +      int count = 0;
 +
-       printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",pubEP->frameworkUUID,pubEP->serviceID,pubEP->topic);
++      printf("PSA_ZMQ: Removing publication [FWUUID=%s bundleID=%ld 
topic=%s]\n",
++                 properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                 pubEP->serviceID,
++                 properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
 +      const char* fwUUID = NULL;
 +
 +      
bundleContext_getProperty(admin->bundle_context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID);
 +      if(fwUUID==NULL){
 +              printf("PSA_ZMQ: Cannot retrieve fwUUID.\n");
 +              return CELIX_INVALID_BUNDLE_CONTEXT;
 +      }
-       char *scope_topic = createScopeTopicKey(pubEP->scope, pubEP->topic);
++      char *scope_topic = 
createScopeTopicKey(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
-       if(strcmp(pubEP->frameworkUUID,fwUUID)==0){
++      if(strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){
 +
 +              celixThreadMutex_lock(&admin->localPublicationsLock);
 +              service_factory_pt factory = 
(service_factory_pt)hashMap_get(admin->localPublications,scope_topic);
 +              if(factory!=NULL){
 +                      topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
 +                      pubsub_topicPublicationRemovePublisherEP(pub,pubEP);
 +              }
 +              celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +              if(factory==NULL){
 +                      /* Maybe the endpoint was pending */
 +                      celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +                      
if(!arrayList_removeElement(admin->noSerializerPublications, pubEP)){
 +                              status = CELIX_ILLEGAL_STATE;
 +                      }
 +                      
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +              }
 +      }
 +      else{
 +
 +              celixThreadMutex_lock(&admin->externalPublicationsLock);
 +              array_list_pt ext_pub_list = 
(array_list_pt)hashMap_get(admin->externalPublications,scope_topic);
 +              if(ext_pub_list!=NULL){
 +                      int i;
 +                      bool found = false;
 +                      for(i=0;!found && i<arrayList_size(ext_pub_list);i++){
 +                              pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
 +                              found = pubsubEndpoint_equals(pubEP,p);
 +                              if (found){
 +                                      arrayList_remove(ext_pub_list,i);
 +                              }
 +                      }
 +                      // Check if there are more publishers on the same 
endpoint (happens when 1 celix-instance with multiple bundles publish in same 
topic)
 +                      for(i=0; i<arrayList_size(ext_pub_list);i++) {
 +                              pubsub_endpoint_pt p  = 
(pubsub_endpoint_pt)arrayList_get(ext_pub_list,i);
-                               if (strcmp(pubEP->endpoint,p->endpoint) == 0) {
++                              if 
(strcmp(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL),properties_get(p->endpoint_props, PUBSUB_ENDPOINT_URL)) == 
0) {
 +                                      count++;
 +                              }
 +                      }
 +
 +                      if(arrayList_size(ext_pub_list)==0){
 +                              hash_map_entry_pt entry = 
hashMap_getEntry(admin->externalPublications,scope_topic);
 +                              char* topic = (char*)hashMapEntry_getKey(entry);
 +                              array_list_pt list = 
(array_list_pt)hashMapEntry_getValue(entry);
 +                              
hashMap_remove(admin->externalPublications,topic);
 +                              arrayList_destroy(list);
 +                              free(topic);
 +                      }
 +              }
 +
 +              celixThreadMutex_unlock(&admin->externalPublicationsLock);
 +      }
 +
 +      /* Check if this publisher was connected to one of our subscribers*/
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +
 +      topic_subscription_pt sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,scope_topic);
-       if(sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,pubEP->endpoint);
++      if(sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
++              
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(sub,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
 +      }
 +
 +      /* And check also for ANY subscription */
 +      topic_subscription_pt any_sub = 
(topic_subscription_pt)hashMap_get(admin->subscriptions,PUBSUB_ANY_SUB_TOPIC);
-       if(any_sub!=NULL && pubEP->endpoint!=NULL && count == 0){
-               
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,pubEP->endpoint);
++      if(any_sub!=NULL && properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL && count == 0){
++              
pubsub_topicSubscriptionAddDisconnectPublisherToPendingList(any_sub,(char*)properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_URL));
 +      }
 +
 +      free(scope_topic);
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllPublications(pubsub_admin_pt admin, char 
*scope, char* topic){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      printf("PSA_ZMQ: Closing all publications\n");
 +
 +      celixThreadMutex_lock(&admin->localPublicationsLock);
 +      char *scope_topic = createScopeTopicKey(scope, topic);
 +      hash_map_entry_pt pubsvc_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->localPublications,scope_topic);
 +      if(pubsvc_entry!=NULL){
 +              char* key = (char*)hashMapEntry_getKey(pubsvc_entry);
 +              service_factory_pt factory= 
(service_factory_pt)hashMapEntry_getValue(pubsvc_entry);
 +              topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
 +              status += pubsub_topicPublicationStop(pub);
 +              disconnectTopicPubSubFromSerializer(admin, pub, true);
 +              status += pubsub_topicPublicationDestroy(pub);
 +              hashMap_remove(admin->localPublications,scope_topic);
 +              free(key);
 +              free(factory);
 +      }
 +      free(scope_topic);
 +      celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +      return status;
 +
 +}
 +
 +celix_status_t pubsubAdmin_closeAllSubscriptions(pubsub_admin_pt admin,char* 
scope,char* topic){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      printf("PSA_ZMQ: Closing all subscriptions\n");
 +
 +      celixThreadMutex_lock(&admin->subscriptionsLock);
 +      char *scope_topic = createScopeTopicKey(scope, topic);
 +      hash_map_entry_pt sub_entry = 
(hash_map_entry_pt)hashMap_getEntry(admin->subscriptions,scope_topic);
 +      if(sub_entry!=NULL){
 +              char* topic = (char*)hashMapEntry_getKey(sub_entry);
 +
 +              topic_subscription_pt ts = 
(topic_subscription_pt)hashMapEntry_getValue(sub_entry);
 +              status += pubsub_topicSubscriptionStop(ts);
 +              disconnectTopicPubSubFromSerializer(admin, ts, false);
 +              status += pubsub_topicSubscriptionDestroy(ts);
 +              hashMap_remove(admin->subscriptions,scope_topic);
 +              free(topic);
 +
 +      }
 +      free(scope_topic);
 +      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +      return status;
 +
 +}
 +
 +
 +#ifndef ANDROID
 +static celix_status_t pubsubAdmin_getIpAdress(const char* interface, char** 
ip) {
 +      celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +      struct ifaddrs *ifaddr, *ifa;
 +      char host[NI_MAXHOST];
 +
 +      if (getifaddrs(&ifaddr) != -1)
 +      {
 +              for (ifa = ifaddr; ifa != NULL && status != CELIX_SUCCESS; ifa 
= ifa->ifa_next)
 +              {
 +                      if (ifa->ifa_addr == NULL)
 +                              continue;
 +
 +                      if ((getnameinfo(ifa->ifa_addr,sizeof(struct 
sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) && 
(ifa->ifa_addr->sa_family == AF_INET)) {
 +                              if (interface == NULL) {
 +                                      *ip = strdup(host);
 +                                      status = CELIX_SUCCESS;
 +                              }
 +                              else if (strcmp(ifa->ifa_name, interface) == 0) 
{
 +                                      *ip = strdup(host);
 +                                      status = CELIX_SUCCESS;
 +                              }
 +                      }
 +              }
 +
 +              freeifaddrs(ifaddr);
 +      }
 +
 +      return status;
 +}
 +#endif
 +
 +static celix_status_t 
pubsubAdmin_addSubscriptionToPendingList(pubsub_admin_pt 
admin,pubsub_endpoint_pt subEP){
 +      celix_status_t status = CELIX_SUCCESS;
-       char* scope_topic = createScopeTopicKey(subEP->scope, subEP->topic);
++      char* scope_topic = 
createScopeTopicKey(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE), properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +      array_list_pt pendingListPerTopic = 
hashMap_get(admin->pendingSubscriptions,scope_topic);
 +      if(pendingListPerTopic==NULL){
 +              arrayList_create(&pendingListPerTopic);
 +              
hashMap_put(admin->pendingSubscriptions,strdup(scope_topic),pendingListPerTopic);
 +      }
 +      arrayList_add(pendingListPerTopic,subEP);
 +      free(scope_topic);
 +      return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerAdded(void * handle, 
service_reference_pt reference, void * service){
 +      /* Assumption: serializers are all available at startup.
 +       * If a new (possibly better) serializer is installed and started, 
already created topic_publications/subscriptions will not be destroyed and 
recreated */
 +
 +      celix_status_t status = CELIX_SUCCESS;
 +      int i=0;
 +
 +      const char *serType = NULL;
 +      serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +      if(serType == NULL){
 +              printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
 +              return CELIX_SERVICE_EXCEPTION;
 +      }
 +
 +      pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +      celixThreadMutex_lock(&admin->serializerListLock);
 +      arrayList_add(admin->serializerList, reference);
 +      celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +      /* Now let's re-evaluate the pending */
 +      celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +
 +      for(i=0;i<arrayList_size(admin->noSerializerSubscriptions);i++){
 +              pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerSubscriptions,i);
 +              pubsub_serializer_service_t *best_serializer = NULL;
 +              pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +              if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
 +                      pubsubAdmin_addSubscription(admin, ep);
 +              }
 +      }
 +
 +      for(i=0;i<arrayList_size(admin->noSerializerPublications);i++){
 +              pubsub_endpoint_pt ep = 
(pubsub_endpoint_pt)arrayList_get(admin->noSerializerPublications,i);
 +              pubsub_serializer_service_t *best_serializer = NULL;
 +              pubsubAdmin_getBestSerializer(admin, ep, &best_serializer);
 +              if(best_serializer != NULL){ /* Finally we have a valid 
serializer! */
 +                      pubsubAdmin_addPublication(admin, ep);
 +              }
 +      }
 +
 +      celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +
 +      printf("PSA_ZMQ: %s serializer added\n",serType);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsubAdmin_serializerRemoved(void * handle, 
service_reference_pt reference, void * service){
 +
 +      pubsub_admin_pt admin = (pubsub_admin_pt)handle;
 +      int i=0, j=0;
 +      const char *serType = NULL;
 +
 +      serviceReference_getProperty(reference, 
PUBSUB_SERIALIZER_TYPE_KEY,&serType);
 +      if(serType == NULL){
 +              printf("Serializer serviceReference %p has no 
pubsub_serializer.type property specified\n",reference);
 +              return CELIX_SERVICE_EXCEPTION;
 +      }
 +
 +      celixThreadMutex_lock(&admin->serializerListLock);
 +      /* Remove the serializer from the list */
 +      arrayList_removeElement(admin->serializerList, reference);
 +      celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +
 +      celixThreadMutex_lock(&admin->usedSerializersLock);
 +      array_list_pt topicPubList = 
(array_list_pt)hashMap_remove(admin->topicPublicationsPerSerializer, service);
 +      array_list_pt topicSubList = 
(array_list_pt)hashMap_remove(admin->topicSubscriptionsPerSerializer, service);
 +      celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +      /* Now destroy the topicPublications, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
 +      if(topicPubList!=NULL){
 +              for(i=0;i<arrayList_size(topicPubList);i++){
 +                      topic_publication_pt topicPub = 
(topic_publication_pt)arrayList_get(topicPubList,i);
 +                      /* Stop the topic publication */
 +                      pubsub_topicPublicationStop(topicPub);
 +                      /* Get the endpoints that are going to be orphan */
 +                      array_list_pt pubList = 
pubsub_topicPublicationGetPublisherList(topicPub);
 +                      for(j=0;j<arrayList_size(pubList);j++){
 +                              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pubList,j);
 +                              /* Remove the publication */
 +                              pubsubAdmin_removePublication(admin, pubEP);
 +                              /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(pubEP->endpoint!=NULL){
-                                       free(pubEP->endpoint);
-                                       pubEP->endpoint = NULL;
++                              if(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
++                                      properties_unset(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
 +                              }
 +                              /* Add the orphan endpoint to the noSerializer 
pending list */
 +                              
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +                              
arrayList_add(admin->noSerializerPublications,pubEP);
 +                              
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +                      }
 +                      arrayList_destroy(pubList);
 +
 +                      /* Cleanup also the localPublications hashmap*/
 +                      celixThreadMutex_lock(&admin->localPublicationsLock);
 +                      hash_map_iterator_pt iter = 
hashMapIterator_create(admin->localPublications);
 +                      char *key = NULL;
 +                      service_factory_pt factory = NULL;
 +                      while(hashMapIterator_hasNext(iter)){
 +                              hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
 +                              factory = 
(service_factory_pt)hashMapEntry_getValue(entry);
 +                              topic_publication_pt pub = 
(topic_publication_pt)factory->handle;
 +                              if(pub==topicPub){
 +                                      key = (char*)hashMapEntry_getKey(entry);
 +                                      break;
 +                              }
 +                      }
 +                      hashMapIterator_destroy(iter);
 +                      if(key!=NULL){
 +                              hashMap_remove(admin->localPublications, key);
 +                              free(factory);
 +                              free(key);
 +                      }
 +                      celixThreadMutex_unlock(&admin->localPublicationsLock);
 +
 +                      /* Finally destroy the topicPublication */
 +                      pubsub_topicPublicationDestroy(topicPub);
 +              }
 +              arrayList_destroy(topicPubList);
 +      }
 +
 +      /* Now destroy the topicSubscriptions, but first put back the 
pubsub_endpoints back to the noSerializer pending list */
 +      if(topicSubList!=NULL){
 +              for(i=0;i<arrayList_size(topicSubList);i++){
 +                      topic_subscription_pt topicSub = 
(topic_subscription_pt)arrayList_get(topicSubList,i);
 +                      /* Stop the topic subscription */
 +                      pubsub_topicSubscriptionStop(topicSub);
 +                      /* Get the endpoints that are going to be orphan */
 +                      array_list_pt subList = 
pubsub_topicSubscriptionGetSubscribersList(topicSub);
 +                      for(j=0;j<arrayList_size(subList);j++){
 +                              pubsub_endpoint_pt subEP = 
(pubsub_endpoint_pt)arrayList_get(subList,j);
 +                              /* Remove the subscription */
 +                              pubsubAdmin_removeSubscription(admin, subEP);
 +                              /* Reset the endpoint field, so that will be 
recreated from scratch when a new serializer will be found */
-                               if(subEP->endpoint!=NULL){
-                                       free(subEP->endpoint);
-                                       subEP->endpoint = NULL;
++                              if(properties_get(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL)!=NULL){
++                                      properties_unset(subEP->endpoint_props, 
PUBSUB_ENDPOINT_URL);
 +                              }
 +                              /* Add the orphan endpoint to the noSerializer 
pending list */
 +                              
celixThreadMutex_lock(&admin->noSerializerPendingsLock);
 +                              
arrayList_add(admin->noSerializerSubscriptions,subEP);
 +                              
celixThreadMutex_unlock(&admin->noSerializerPendingsLock);
 +                      }
 +
 +                      /* Cleanup also the subscriptions hashmap*/
 +                      celixThreadMutex_lock(&admin->subscriptionsLock);
 +                      hash_map_iterator_pt iter = 
hashMapIterator_create(admin->subscriptions);
 +                      char *key = NULL;
 +                      while(hashMapIterator_hasNext(iter)){
 +                              hash_map_entry_pt entry = 
hashMapIterator_nextEntry(iter);
 +                              topic_subscription_pt sub = 
(topic_subscription_pt)hashMapEntry_getValue(entry);
 +                              if(sub==topicSub){
 +                                      key = (char*)hashMapEntry_getKey(entry);
 +                                      break;
 +                              }
 +                      }
 +                      hashMapIterator_destroy(iter);
 +                      if(key!=NULL){
 +                              hashMap_remove(admin->subscriptions, key);
 +                              free(key);
 +                      }
 +                      celixThreadMutex_unlock(&admin->subscriptionsLock);
 +
 +                      /* Finally destroy the topicSubscription */
 +                      pubsub_topicSubscriptionDestroy(topicSub);
 +              }
 +              arrayList_destroy(topicSubList);
 +      }
 +
 +
 +
 +      printf("PSA_ZMQ: %s serializer removed\n",serType);
 +
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsubAdmin_matchEndpoint(pubsub_admin_pt admin, 
pubsub_endpoint_pt endpoint, double* score){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&admin->serializerListLock);
 +      status = 
pubsub_admin_match(endpoint->topic_props,PUBSUB_ADMIN_TYPE,admin->serializerList,score);
 +      celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +      return status;
 +}
 +
 +/* This one recall the same logic as in the match function */
 +static celix_status_t pubsubAdmin_getBestSerializer(pubsub_admin_pt 
admin,pubsub_endpoint_pt ep, pubsub_serializer_service_t **serSvc){
 +
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&admin->serializerListLock);
 +      status = pubsub_admin_get_best_serializer(ep->topic_props, 
admin->serializerList, serSvc);
 +      celixThreadMutex_unlock(&admin->serializerListLock);
 +
 +      return status;
 +
 +}
 +
 +static void connectTopicPubSubToSerializer(pubsub_admin_pt 
admin,pubsub_serializer_service_t *serializer,void *topicPubSub,bool 
isPublication){
 +
 +      celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +      hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +      array_list_pt list = (array_list_pt)hashMap_get(map,serializer);
 +      if(list==NULL){
 +              arrayList_create(&list);
 +              hashMap_put(map,serializer,list);
 +      }
 +      arrayList_add(list,topicPubSub);
 +
 +      celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}
 +
 +static void disconnectTopicPubSubFromSerializer(pubsub_admin_pt admin,void 
*topicPubSub,bool isPublication){
 +
 +      celixThreadMutex_lock(&admin->usedSerializersLock);
 +
 +      hash_map_pt map = 
isPublication?admin->topicPublicationsPerSerializer:admin->topicSubscriptionsPerSerializer;
 +      hash_map_iterator_pt iter = hashMapIterator_create(map);
 +      while(hashMapIterator_hasNext(iter)){
 +              array_list_pt list = 
(array_list_pt)hashMapIterator_nextValue(iter);
 +              if(arrayList_removeElement(list, topicPubSub)){ //Found it!
 +                      break;
 +              }
 +      }
 +      hashMapIterator_destroy(iter);
 +
 +      celixThreadMutex_unlock(&admin->usedSerializersLock);
 +
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_admin_zmq/src/topic_publication.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_admin_zmq/src/topic_publication.c
index b612605,0000000..873cec2
mode 100644,000000..100644
--- a/pubsub/pubsub_admin_zmq/src/topic_publication.c
+++ b/pubsub/pubsub_admin_zmq/src/topic_publication.c
@@@ -1,630 -1,0 +1,631 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <czmq.h>
 +/* The following undefs prevent the collision between:
 + * - sys/syslog.h (which is included within czmq)
 + * - celix/dfi/dfi_log_util.h
 + */
 +#undef LOG_DEBUG
 +#undef LOG_WARNING
 +#undef LOG_INFO
 +#undef LOG_WARNING
 +
 +#include <stdlib.h>
 +#include <string.h>
 +#include <unistd.h>
 +
 +#include "array_list.h"
 +#include "celixbool.h"
 +#include "service_registration.h"
 +#include "utils.h"
 +#include "service_factory.h"
 +#include "version.h"
 +
 +#include "pubsub_common.h"
 +#include "pubsub_utils.h"
 +#include "pubsub/publisher.h"
 +
 +#include "topic_publication.h"
 +
 +#include "pubsub_serializer.h"
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      #include "zmq_crypto.h"
 +
 +      #define MAX_CERT_PATH_LENGTH 512
 +#endif
 +
 +#define EP_ADDRESS_LEN                32
 +#define ZMQ_BIND_MAX_RETRY    5
 +
 +#define FIRST_SEND_DELAY      2
 +
 +struct topic_publication {
 +      zsock_t* zmq_socket;
 +      celix_thread_mutex_t socket_lock; //Protects zmq_socket access
 +      zcert_t * zmq_cert;
 +      char* endpoint;
 +      service_registration_pt svcFactoryReg;
 +      array_list_pt pub_ep_list; //List<pubsub_endpoint>
 +      hash_map_pt boundServices; //<bundle_pt,bound_service>
 +      pubsub_serializer_service_t *serializer;
 +      celix_thread_mutex_t tp_lock;
 +};
 +
 +typedef struct publish_bundle_bound_service {
 +      topic_publication_pt parent;
 +      pubsub_publisher_t service;
 +      bundle_pt bundle;
 +      char *topic;
 +      hash_map_pt msgTypes;
 +      unsigned short getCount;
 +      celix_thread_mutex_t mp_lock; //Protects publish_bundle_bound_service 
data structure
 +      bool mp_send_in_progress;
 +      array_list_pt mp_parts;
 +}* publish_bundle_bound_service_pt;
 +
 +/* Note: correct locking order is
 + * 1. tp_lock
 + * 2. mp_lock
 + * 3. socket_lock
 + *
 + * tp_lock and socket_lock are independent.
 + */
 +
 +typedef struct pubsub_msg{
 +      pubsub_msg_header_pt header;
 +      char* payload;
 +      int payloadSize;
 +}* pubsub_msg_pt;
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max);
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service);
 +
 +static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle);
 +static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc);
 +
 +static int pubsub_topicPublicationSend(void* handle,unsigned int msgTypeId, 
const void *msg);
 +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *inMsg, int flags);
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId);
 +
 +static void delay_first_send_for_late_joiners(void);
 +
 +celix_status_t pubsub_topicPublicationCreate(bundle_context_pt 
bundle_context, pubsub_endpoint_pt pubEP, pubsub_serializer_service_t 
*best_serializer, char* bindIP, unsigned int basePort, unsigned int maxPort, 
topic_publication_pt *out){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      char* secure_topics = NULL;
 +      bundleContext_getProperty(bundle_context, "SECURE_TOPICS", (const char 
**) &secure_topics);
 +
 +      if (secure_topics){
 +              array_list_pt secure_topics_list = 
pubsub_getTopicsFromString(secure_topics);
 +
 +              int i;
 +              int secure_topics_size = arrayList_size(secure_topics_list);
 +              for (i = 0; i < secure_topics_size; i++){
 +                      char* top = arrayList_get(secure_topics_list, i);
 +                      if (strcmp(pubEP->topic, top) == 0){
 +                              printf("PSA_ZMQ_TP: Secure topic: '%s'\n", top);
 +                              pubEP->is_secure = true;
 +                      }
 +                      free(top);
 +                      top = NULL;
 +              }
 +
 +              arrayList_destroy(secure_topics_list);
 +      }
 +
 +      zcert_t* pub_cert = NULL;
 +      if (pubEP->is_secure){
 +              char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context);
 +              if (keys_bundle_dir == NULL){
 +                      return CELIX_SERVICE_EXCEPTION;
 +              }
 +
 +              const char* keys_file_path = NULL;
 +              const char* keys_file_name = NULL;
 +              bundleContext_getProperty(bundle_context, 
PROPERTY_KEYS_FILE_PATH, &keys_file_path);
 +              bundleContext_getProperty(bundle_context, 
PROPERTY_KEYS_FILE_NAME, &keys_file_name);
 +
 +              char cert_path[MAX_CERT_PATH_LENGTH];
 +
 +              //certificate path 
".cache/bundle{id}/version0.0/./META-INF/keys/publisher/private/pub_{topic}.key"
 +              snprintf(cert_path, MAX_CERT_PATH_LENGTH, 
"%s/META-INF/keys/publisher/private/pub_%s.key.enc", keys_bundle_dir, 
pubEP->topic);
 +              free(keys_bundle_dir);
 +              printf("PSA_ZMQ_TP: Loading key '%s'\n", cert_path);
 +
 +              pub_cert = get_zcert_from_encoded_file((char *) keys_file_path, 
(char *) keys_file_name, cert_path);
 +              if (pub_cert == NULL){
 +                      printf("PSA_ZMQ_TP: Cannot load key '%s'\n", cert_path);
 +                      printf("PSA_ZMQ_TP: Topic '%s' NOT SECURED !\n", 
pubEP->topic);
 +                      pubEP->is_secure = false;
 +              }
 +      }
 +#endif
 +
 +      zsock_t* socket = zsock_new (ZMQ_PUB);
 +      if(socket==NULL){
 +              #ifdef BUILD_WITH_ZMQ_SECURITY
 +                      if (pubEP->is_secure){
 +                              zcert_destroy(&pub_cert);
 +                      }
 +              #endif
 +
 +        perror("Error for zmq_socket");
 +              return CELIX_SERVICE_EXCEPTION;
 +      }
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      if (pubEP->is_secure){
 +              zcert_apply (pub_cert, socket); // apply certificate to socket
 +              zsock_set_curve_server (socket, true); // setup the publisher's 
socket to use the curve functions
 +      }
 +#endif
 +
 +      int rv = -1, retry=0;
 +      char* ep = malloc(EP_ADDRESS_LEN);
 +    char bindAddress[EP_ADDRESS_LEN];
 +
 +      while(rv==-1 && retry<ZMQ_BIND_MAX_RETRY){
 +              /* Randomized part due to same bundle publishing on different 
topics */
 +              unsigned int port = rand_range(basePort,maxPort);
 +              memset(ep,0,EP_ADDRESS_LEN);
 +        memset(bindAddress, 0, EP_ADDRESS_LEN);
 +
 +              snprintf(ep,EP_ADDRESS_LEN,"tcp://%s:%u",bindIP,port);
-         snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); 
//NOTE using a different bind addres than endpoint address
++        snprintf(bindAddress, EP_ADDRESS_LEN, "tcp://0.0.0.0:%u", port); 
//NOTE using a different bind address than endpoint address
 +              rv = zsock_bind (socket, "%s", bindAddress);
 +        if (rv == -1) {
 +            perror("Error for zmq_bind");
 +        }
 +              retry++;
 +      }
 +
 +      if(rv == -1){
 +              free(ep);
 +              return CELIX_SERVICE_EXCEPTION;
 +      }
 +
 +      /* ZMQ stuffs are all fine at this point. Let's create and initialize 
the structure */
 +
 +      topic_publication_pt pub = calloc(1,sizeof(*pub));
 +
 +      arrayList_create(&(pub->pub_ep_list));
 +      pub->boundServices = hashMap_create(NULL,NULL,NULL,NULL);
 +      celixThreadMutex_create(&(pub->tp_lock),NULL);
 +
 +      pub->endpoint = ep;
 +      pub->zmq_socket = socket;
 +      pub->serializer = best_serializer;
 +
 +      celixThreadMutex_create(&(pub->socket_lock),NULL);
 +
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      if (pubEP->is_secure){
 +              pub->zmq_cert = pub_cert;
 +      }
 +#endif
 +
 +      pubsub_topicPublicationAddPublisherEP(pub,pubEP);
 +
 +      *out = pub;
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationDestroy(topic_publication_pt pub){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +
 +      free(pub->endpoint);
 +      arrayList_destroy(pub->pub_ep_list);
 +
 +      hash_map_iterator_pt iter = hashMapIterator_create(pub->boundServices);
 +      while(hashMapIterator_hasNext(iter)){
 +              publish_bundle_bound_service_pt bound = 
hashMapIterator_nextValue(iter);
 +              pubsub_destroyPublishBundleBoundService(bound);
 +      }
 +      hashMapIterator_destroy(iter);
 +      hashMap_destroy(pub->boundServices,false,false);
 +
 +      pub->svcFactoryReg = NULL;
 +      pub->serializer = NULL;
 +#ifdef BUILD_WITH_ZMQ_SECURITY
 +      zcert_destroy(&(pub->zmq_cert));
 +#endif
 +
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +      celixThreadMutex_destroy(&(pub->tp_lock));
 +
 +      celixThreadMutex_lock(&(pub->socket_lock));
 +      zsock_destroy(&(pub->zmq_socket));
 +      celixThreadMutex_unlock(&(pub->socket_lock));
 +
 +      celixThreadMutex_destroy(&(pub->socket_lock));
 +
 +      free(pub);
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStart(bundle_context_pt 
bundle_context,topic_publication_pt pub,service_factory_pt* svcFactory){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      /* Let's register the new service */
 +
 +      pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(pub->pub_ep_list,0);
 +
 +      if(pubEP!=NULL){
 +              service_factory_pt factory = calloc(1, sizeof(*factory));
 +              factory->handle = pub;
 +              factory->getService = pubsub_topicPublicationGetService;
 +              factory->ungetService = pubsub_topicPublicationUngetService;
 +
 +              properties_pt props = properties_create();
-               properties_set(props,PUBSUB_PUBLISHER_TOPIC,pubEP->topic);
-               properties_set(props,PUBSUB_PUBLISHER_SCOPE,pubEP->scope);
++              
properties_set(props,PUBSUB_PUBLISHER_TOPIC,properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_TOPIC));
++              
properties_set(props,PUBSUB_PUBLISHER_SCOPE,properties_get(pubEP->endpoint_props,
 PUBSUB_ENDPOINT_SCOPE));
 +              properties_set(props,"service.version", 
PUBSUB_PUBLISHER_SERVICE_VERSION);
 +
 +              status = 
bundleContext_registerServiceFactory(bundle_context,PUBSUB_PUBLISHER_SERVICE_NAME,factory,props,&(pub->svcFactoryReg));
 +
 +              if(status != CELIX_SUCCESS){
 +                      properties_destroy(props);
-                       printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register 
ServiceFactory for topic %s (bundle %ld).\n",pubEP->topic,pubEP->serviceID);
++                      printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot register 
ServiceFactory for topic %s (bundle %ld).\n",
++                                 properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),pubEP->serviceID);
 +              }
 +              else{
 +                      *svcFactory = factory;
 +              }
 +      }
 +      else{
 +              printf("PSA_ZMQ_PSA_ZMQ_TP: Cannot find pubsub_endpoint after 
adding it...Should never happen!\n");
 +              status = CELIX_SERVICE_EXCEPTION;
 +      }
 +
 +      return status;
 +}
 +
 +celix_status_t pubsub_topicPublicationStop(topic_publication_pt pub){
 +      return serviceRegistration_unregister(pub->svcFactoryReg);
 +}
 +
 +celix_status_t pubsub_topicPublicationAddPublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
 +
 +      celixThreadMutex_lock(&(pub->tp_lock));
-       ep->endpoint = strdup(pub->endpoint);
++    pubsubEndpoint_setField(ep, PUBSUB_ENDPOINT_URL, pub->endpoint);
 +      arrayList_add(pub->pub_ep_list,ep);
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +celix_status_t pubsub_topicPublicationRemovePublisherEP(topic_publication_pt 
pub,pubsub_endpoint_pt ep){
 +
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +      for (int i = 0; i < arrayList_size(pub->pub_ep_list); i++) {
 +              pubsub_endpoint_pt e = arrayList_get(pub->pub_ep_list, i);
 +              if(pubsubEndpoint_equals(ep, e)) {
 +                  arrayList_removeElement(pub->pub_ep_list,ep);
 +                  break;
 +              }
 +      }
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +array_list_pt pubsub_topicPublicationGetPublisherList(topic_publication_pt 
pub){
 +      array_list_pt list = NULL;
 +      celixThreadMutex_lock(&(pub->tp_lock));
 +      list = arrayList_clone(pub->pub_ep_list);
 +      celixThreadMutex_unlock(&(pub->tp_lock));
 +      return list;
 +}
 +
 +
 +static celix_status_t pubsub_topicPublicationGetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service) {
 +      celix_status_t  status = CELIX_SUCCESS;
 +
 +      topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +      celixThreadMutex_lock(&(publish->tp_lock));
 +
 +      publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +      if(bound==NULL){
 +              bound = pubsub_createPublishBundleBoundService(publish,bundle);
 +              if(bound!=NULL){
 +                      hashMap_put(publish->boundServices,bundle,bound);
 +              }
 +      }
 +      else{
 +              bound->getCount++;
 +      }
 +
 +      *service = &bound->service;
 +
 +      celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +      return status;
 +}
 +
 +static celix_status_t pubsub_topicPublicationUngetService(void* handle, 
bundle_pt bundle, service_registration_pt registration, void **service)  {
 +
 +      topic_publication_pt publish = (topic_publication_pt)handle;
 +
 +      celixThreadMutex_lock(&(publish->tp_lock));
 +
 +      publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt)hashMap_get(publish->boundServices,bundle);
 +      if(bound!=NULL){
 +
 +              bound->getCount--;
 +              if(bound->getCount==0){
 +                      pubsub_destroyPublishBundleBoundService(bound);
 +                      hashMap_remove(publish->boundServices,bundle);
 +              }
 +
 +      }
 +      else{
 +              long bundleId = -1;
 +              bundle_getBundleId(bundle,&bundleId);
 +              printf("PSA_ZMQ_TP: Unexpected ungetService call for bundle 
%ld.\n", bundleId);
 +      }
 +
 +      /* service should be never used for unget, so let's set the pointer to 
NULL */
 +      *service = NULL;
 +
 +      celixThreadMutex_unlock(&(publish->tp_lock));
 +
 +      return CELIX_SUCCESS;
 +}
 +
 +static bool send_pubsub_msg(zsock_t* zmq_socket, pubsub_msg_pt msg, bool 
last){
 +
 +      bool ret = true;
 +
 +      zframe_t* headerMsg = zframe_new(msg->header, sizeof(struct 
pubsub_msg_header));
 +      if (headerMsg == NULL) ret=false;
 +      zframe_t* payloadMsg = zframe_new(msg->payload, msg->payloadSize);
 +      if (payloadMsg == NULL) ret=false;
 +
 +      delay_first_send_for_late_joiners();
 +
 +      if( zframe_send(&headerMsg,zmq_socket, ZFRAME_MORE) == -1) ret=false;
 +
 +      if(!last){
 +              if( zframe_send(&payloadMsg,zmq_socket, ZFRAME_MORE) == -1) 
ret=false;
 +      }
 +      else{
 +              if( zframe_send(&payloadMsg,zmq_socket, 0) == -1) ret=false;
 +      }
 +
 +      if (!ret){
 +              zframe_destroy(&headerMsg);
 +              zframe_destroy(&payloadMsg);
 +      }
 +
 +      free(msg->header);
 +      free(msg->payload);
 +      free(msg);
 +
 +      return ret;
 +
 +}
 +
 +static bool send_pubsub_mp_msg(zsock_t* zmq_socket, array_list_pt 
mp_msg_parts){
 +
 +      bool ret = true;
 +
 +      unsigned int i = 0;
 +      unsigned int mp_num = arrayList_size(mp_msg_parts);
 +      for(;i<mp_num;i++){
 +              ret = ret && send_pubsub_msg(zmq_socket, 
(pubsub_msg_pt)arrayList_get(mp_msg_parts,i), (i==mp_num-1));
 +      }
 +      arrayList_clear(mp_msg_parts);
 +
 +      return ret;
 +
 +}
 +
 +static int pubsub_topicPublicationSend(void* handle, unsigned int msgTypeId, 
const void *msg) {
 +
 +      return pubsub_topicPublicationSendMultipart(handle,msgTypeId,msg, 
PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG);
 +
 +}
 +
 +static int pubsub_topicPublicationSendMultipart(void *handle, unsigned int 
msgTypeId, const void *inMsg, int flags){
 +
 +      int status = 0;
 +
 +      publish_bundle_bound_service_pt bound = 
(publish_bundle_bound_service_pt) handle;
 +
 +      celixThreadMutex_lock(&(bound->parent->tp_lock));
 +      celixThreadMutex_lock(&(bound->mp_lock));
 +      if( (flags & PUBSUB_PUBLISHER_FIRST_MSG) && !(flags & 
PUBSUB_PUBLISHER_LAST_MSG) && bound->mp_send_in_progress){ //means a real mp_msg
 +              printf("PSA_ZMQ_TP: Multipart send already in progress. Cannot 
process a new one.\n");
 +              celixThreadMutex_unlock(&(bound->mp_lock));
 +              celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +              return -3;
 +      }
 +
 +      pubsub_msg_serializer_t* msgSer = 
(pubsub_msg_serializer_t*)hashMap_get(bound->msgTypes, 
(void*)(uintptr_t)msgTypeId);
 +
 +      if (msgSer!= NULL) {
 +              int major=0, minor=0;
 +
 +              pubsub_msg_header_pt msg_hdr = calloc(1,sizeof(struct 
pubsub_msg_header));
 +              strncpy(msg_hdr->topic,bound->topic,MAX_TOPIC_LEN-1);
 +              msg_hdr->type = msgTypeId;
 +
 +              if (msgSer->msgVersion != NULL){
 +                      version_getMajor(msgSer->msgVersion, &major);
 +                      version_getMinor(msgSer->msgVersion, &minor);
 +                      msg_hdr->major = major;
 +                      msg_hdr->minor = minor;
 +              }
 +
 +              void *serializedOutput = NULL;
 +              size_t serializedOutputLen = 0;
 +              msgSer->serialize(msgSer,inMsg,&serializedOutput, 
&serializedOutputLen);
 +
 +              pubsub_msg_pt msg = calloc(1,sizeof(struct pubsub_msg));
 +              msg->header = msg_hdr;
 +              msg->payload = (char*)serializedOutput;
 +              msg->payloadSize = serializedOutputLen;
 +              bool snd = true;
 +
 +              switch(flags){
 +              case PUBSUB_PUBLISHER_FIRST_MSG:
 +                      bound->mp_send_in_progress = true;
 +                      arrayList_add(bound->mp_parts,msg);
 +                      break;
 +              case PUBSUB_PUBLISHER_PART_MSG:
 +                      if(!bound->mp_send_in_progress){
 +                              printf("PSA_ZMQ_TP: ERROR: received msg part 
without the first part.\n");
 +                              status = -4;
 +                      }
 +                      else{
 +                              arrayList_add(bound->mp_parts,msg);
 +                      }
 +                      break;
 +              case PUBSUB_PUBLISHER_LAST_MSG:
 +                      if(!bound->mp_send_in_progress){
 +                              printf("PSA_ZMQ_TP: ERROR: received end msg 
without the first part.\n");
 +                              status = -4;
 +                      }
 +                      else{
 +                              arrayList_add(bound->mp_parts,msg);
 +                              snd = 
send_pubsub_mp_msg(bound->parent->zmq_socket,bound->mp_parts);
 +                              bound->mp_send_in_progress = false;
 +                      }
 +                      break;
 +              case PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG:    
//Normal send case
 +                      snd = 
send_pubsub_msg(bound->parent->zmq_socket,msg,true);
 +                      break;
 +              default:
 +                      printf("PSA_ZMQ_TP: ERROR: Invalid MP flags 
combination\n");
 +                      status = -4;
 +                      break;
 +              }
 +
 +              if(status==-4){
 +                      free(msg);
 +              }
 +
 +              if(!snd){
 +                      printf("PSA_ZMQ_TP: Failed to send %s message 
%u.\n",flags == (PUBSUB_PUBLISHER_FIRST_MSG | PUBSUB_PUBLISHER_LAST_MSG) ? 
"single" : "multipart", msgTypeId);
 +              }
 +
 +      } else {
 +        printf("PSA_ZMQ_TP: No msg serializer available for msg type id 
%d\n", msgTypeId);
 +              status=-1;
 +      }
 +
 +      celixThreadMutex_unlock(&(bound->mp_lock));
 +      celixThreadMutex_unlock(&(bound->parent->tp_lock));
 +
 +      return status;
 +
 +}
 +
 +static int pubsub_localMsgTypeIdForUUID(void* handle, const char* msgType, 
unsigned int* msgTypeId){
 +      *msgTypeId = utils_stringHash(msgType);
 +      return 0;
 +}
 +
 +
 +static unsigned int rand_range(unsigned int min, unsigned int max){
 +
 +      double scaled = (double)(((double)random())/((double)RAND_MAX));
 +      return (max-min+1)*scaled + min;
 +
 +}
 +
 +static publish_bundle_bound_service_pt 
pubsub_createPublishBundleBoundService(topic_publication_pt tp,bundle_pt 
bundle){
 +
 +      //PRECOND lock on tp->lock
 +
 +      publish_bundle_bound_service_pt bound = calloc(1, sizeof(*bound));
 +
 +      if (bound != NULL) {
 +
 +              bound->parent = tp;
 +              bound->bundle = bundle;
 +              bound->getCount = 1;
 +              bound->mp_send_in_progress = false;
 +              celixThreadMutex_create(&bound->mp_lock,NULL);
 +
 +              if(tp->serializer != NULL){
 +                      
tp->serializer->createSerializerMap(tp->serializer->handle,bundle,&bound->msgTypes);
 +              }
 +
 +              arrayList_create(&bound->mp_parts);
 +
 +              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(bound->parent->pub_ep_list,0);
-               bound->topic=strdup(pubEP->topic);
++              bound->topic=strdup(properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC));
 +
 +              bound->service.handle = bound;
 +              bound->service.localMsgTypeIdForMsgType = 
pubsub_localMsgTypeIdForUUID;
 +              bound->service.send = pubsub_topicPublicationSend;
 +              bound->service.sendMultipart = 
pubsub_topicPublicationSendMultipart;
 +
 +      }
 +
 +      return bound;
 +}
 +
 +static void 
pubsub_destroyPublishBundleBoundService(publish_bundle_bound_service_pt 
boundSvc){
 +
 +      //PRECOND lock on tp->lock
 +
 +      celixThreadMutex_lock(&boundSvc->mp_lock);
 +
 +
 +      if(boundSvc->parent->serializer != NULL && boundSvc->msgTypes != NULL){
 +              
boundSvc->parent->serializer->destroySerializerMap(boundSvc->parent->serializer->handle,
 boundSvc->msgTypes);
 +      }
 +
 +      if(boundSvc->mp_parts!=NULL){
 +              arrayList_destroy(boundSvc->mp_parts);
 +      }
 +
 +      if(boundSvc->topic!=NULL){
 +              free(boundSvc->topic);
 +      }
 +
 +      celixThreadMutex_unlock(&boundSvc->mp_lock);
 +      celixThreadMutex_destroy(&boundSvc->mp_lock);
 +
 +      free(boundSvc);
 +
 +}
 +
 +static void delay_first_send_for_late_joiners(){
 +
 +      static bool firstSend = true;
 +
 +      if(firstSend){
 +              printf("PSA_ZMQ_TP: Delaying first send for late joiners...\n");
 +              sleep(FIRST_SEND_DELAY);
 +              firstSend = false;
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/etcd_watcher.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_discovery/src/etcd_watcher.c
index 3c3a5a8,0000000..726269a
mode 100644,000000..100644
--- a/pubsub/pubsub_discovery/src/etcd_watcher.c
+++ b/pubsub/pubsub_discovery/src/etcd_watcher.c
@@@ -1,290 -1,0 +1,344 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdbool.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
++#include <jansson.h>
 +
 +#include "celix_log.h"
 +#include "constants.h"
 +
 +#include "etcd.h"
 +#include "etcd_watcher.h"
 +
 +#include "pubsub_discovery.h"
 +#include "pubsub_discovery_impl.h"
 +
 +
 +
 +#define MAX_ROOTNODE_LENGTH             128
 +#define MAX_LOCALNODE_LENGTH            4096
 +#define MAX_FIELD_LENGTH                128
 +
 +#define CFG_ETCD_ROOT_PATH              "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
 +#define DEFAULT_ETCD_ROOTPATH           "pubsub/discovery"
 +
 +#define CFG_ETCD_SERVER_IP              "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
 +#define DEFAULT_ETCD_SERVER_IP          "127.0.0.1"
 +
 +#define CFG_ETCD_SERVER_PORT            "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
 +#define DEFAULT_ETCD_SERVER_PORT        2379
 +
 +// be careful - this should be higher than the curl timeout
 +#define CFG_ETCD_TTL                    "DISCOVERY_ETCD_TTL"
 +#define DEFAULT_ETCD_TTL                30
 +
 +
 +struct etcd_watcher {
 +      pubsub_discovery_pt pubsub_discovery;
 +
 +      celix_thread_mutex_t watcherLock;
 +      celix_thread_t watcherThread;
 +
 +      char *scope;
 +      char *topic;
 +      volatile bool running;
 +};
 +
 +struct etcd_writer {
 +      pubsub_discovery_pt pubsub_discovery;
 +      celix_thread_mutex_t localPubsLock;
 +      array_list_pt localPubs;
 +      volatile bool running;
 +      celix_thread_t writerThread;
 +};
 +
 +
 +// note that the rootNode shouldn't have a leading slash
 +static celix_status_t etcdWatcher_getTopicRootPath(bundle_context_pt context, 
const char *scope, const char *topic, char* rootNode, int rootNodeLen) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      const char* rootPath = NULL;
 +
 +      if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
 +              snprintf(rootNode, rootNodeLen, "%s/%s/%s", 
DEFAULT_ETCD_ROOTPATH, scope, topic);
 +      } else {
 +              snprintf(rootNode, rootNodeLen, "%s/%s/%s", rootPath, scope, 
topic);
 +      }
 +
 +      return status;
 +}
 +
 +static celix_status_t etcdWatcher_getRootPath(bundle_context_pt context, 
char* rootNode) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      const char* rootPath = NULL;
 +
 +      if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, 
&rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
 +              strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
 +      } else {
 +              strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
 +      }
 +
 +      return status;
 +}
 +
 +
 +static void add_node(const char *key, const char *value, void* arg) {
 +      pubsub_discovery_pt ps_discovery = (pubsub_discovery_pt) arg;
 +      pubsub_endpoint_pt pubEP = NULL;
 +      celix_status_t status = 
etcdWatcher_getPublisherEndpointFromKey(ps_discovery, key, value, &pubEP);
 +      if(!status && pubEP) {
 +              pubsub_discovery_addNode(ps_discovery, pubEP);
 +      }
 +}
 +
 +static celix_status_t 
etcdWatcher_addAlreadyExistingPublishers(pubsub_discovery_pt ps_discovery, 
const char *rootPath, long long * highestModified) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      if(etcd_get_directory(rootPath, add_node, ps_discovery, 
highestModified)) {
 +              status = CELIX_ILLEGAL_ARGUMENT;
 +      }
 +      return status;
 +}
 +
 +// gets everything from provided key
 +celix_status_t etcdWatcher_getPublisherEndpointFromKey(pubsub_discovery_pt 
pubsub_discovery, const char* etcdKey, const char* etcdValue, 
pubsub_endpoint_pt* pubEP) {
 +
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      char rootPath[MAX_ROOTNODE_LENGTH];
 +      char *expr = NULL;
 +      char scope[MAX_FIELD_LENGTH];
 +      char topic[MAX_FIELD_LENGTH];
 +      char fwUUID[MAX_FIELD_LENGTH];
 +      char serviceId[MAX_FIELD_LENGTH];
 +
 +      memset(rootPath,0,MAX_ROOTNODE_LENGTH);
 +      memset(topic,0,MAX_FIELD_LENGTH);
 +      memset(fwUUID,0,MAX_FIELD_LENGTH);
 +      memset(serviceId,0,MAX_FIELD_LENGTH);
 +
 +      etcdWatcher_getRootPath(pubsub_discovery->context, rootPath);
 +
 +      asprintf(&expr, "/%s/%%[^/]/%%[^/]/%%[^/]/%%[^/].*", rootPath);
 +      if(expr) {
 +              int foundItems = sscanf(etcdKey, expr, scope, topic, fwUUID, 
serviceId);
 +              free(expr);
 +              if (foundItems != 4) { // Could happen when a directory is 
removed, just don't process this.
 +                      status = CELIX_ILLEGAL_STATE;
 +              }
 +              else{
-                       status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),etcdValue,NULL,pubEP);
++
++                      // etcdValue contains the json formatted string
++                      json_error_t error;
++                      json_t* jsonRoot = json_loads(etcdValue, 
JSON_DECODE_ANY, &error);
++
++                      const char* endpoint_serializer = NULL;
++                      const char* endpoint_admin_type = NULL;
++                      const char* endpoint_url = NULL;
++                      const char* endpoint_type = NULL;
++
++                      if (json_is_object(jsonRoot)){
++
++                              void *iter = json_object_iter(jsonRoot);
++
++                              const char *key;
++                              json_t *value;
++
++                              while (iter) {
++                                      key = json_object_iter_key(iter);
++                                      value = json_object_iter_value(iter);
++
++                                      if (strcmp(key, 
PUBSUB_ENDPOINT_SERIALIZER) == 0) {
++                                              endpoint_serializer = 
json_string_value(value);
++                                      } else if (strcmp(key, 
PUBSUB_ENDPOINT_ADMIN_TYPE) == 0) {
++                                              endpoint_admin_type = 
json_string_value(value);
++                                      } else if (strcmp(key, 
PUBSUB_ENDPOINT_URL) == 0) {
++                                              endpoint_url = 
json_string_value(value);
++                                      } else if (strcmp(key, 
PUBSUB_ENDPOINT_TYPE) == 0) {
++                                              endpoint_type = 
json_string_value(value);
++                                      }
++
++                                      iter = json_object_iter_next(jsonRoot, 
iter);
++                              }
++
++                              if (endpoint_url == NULL) {
++                                      printf("EW: No endpoint found in json 
object!\n");
++                                      endpoint_url = etcdValue;
++                              }
++
++                      } else {
++                              endpoint_url = etcdValue;
++                      }
++
++                      status = 
pubsubEndpoint_create(fwUUID,scope,topic,strtol(serviceId,NULL,10),endpoint_url,NULL,pubEP);
++
++            if (status == CELIX_SUCCESS) {
++                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_SERIALIZER, endpoint_serializer);
++                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_ADMIN_TYPE, endpoint_admin_type);
++                status += pubsubEndpoint_setField(*pubEP, 
PUBSUB_ENDPOINT_TYPE, endpoint_type);
++            }
++
++                      if (jsonRoot != NULL) {
++                              json_decref(jsonRoot);
++                      }
 +              }
 +      }
 +      return status;
 +}
 +
 +/*
 + * performs (blocking) etcd_watch calls to check for
 + * changing discovery endpoint information within etcd.
 + */
 +static void* etcdWatcher_run(void* data) {
 +      etcd_watcher_pt watcher = (etcd_watcher_pt) data;
 +      time_t timeBeforeWatch = time(NULL);
 +      char rootPath[MAX_ROOTNODE_LENGTH];
 +      long long highestModified = 0;
 +
 +      pubsub_discovery_pt ps_discovery = watcher->pubsub_discovery;
 +      bundle_context_pt context = ps_discovery->context;
 +
 +      memset(rootPath, 0, MAX_ROOTNODE_LENGTH);
 +
 +      //TODO: add topic to etcd key
 +      etcdWatcher_getTopicRootPath(context, watcher->scope, watcher->topic, 
rootPath, MAX_ROOTNODE_LENGTH);
 +      etcdWatcher_addAlreadyExistingPublishers(ps_discovery, rootPath, 
&highestModified);
 +
 +      while ((celixThreadMutex_lock(&watcher->watcherLock) == CELIX_SUCCESS) 
&& watcher->running) {
 +
 +              char *rkey = NULL;
 +              char *value = NULL;
 +              char *preValue = NULL;
 +              char *action = NULL;
 +              long long modIndex;
 +
 +              celixThreadMutex_unlock(&watcher->watcherLock);
 +
 +              if (etcd_watch(rootPath, highestModified + 1, &action, 
&preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
 +                      pubsub_endpoint_pt pubEP = NULL;
 +                      if ((strcmp(action, "set") == 0) || (strcmp(action, 
"create") == 0)) {
 +                              if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
 +                                      pubsub_discovery_addNode(ps_discovery, 
pubEP);
 +                              }
 +                      } else if (strcmp(action, "delete") == 0) {
 +                              if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
 +                                      
pubsub_discovery_removeNode(ps_discovery, pubEP);
 +                              }
 +                      } else if (strcmp(action, "expire") == 0) {
 +                              if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, preValue, &pubEP) 
== CELIX_SUCCESS) {
 +                                      
pubsub_discovery_removeNode(ps_discovery, pubEP);
 +                              }
 +                      } else if (strcmp(action, "update") == 0) {
 +                              if 
(etcdWatcher_getPublisherEndpointFromKey(ps_discovery, rkey, value, &pubEP) == 
CELIX_SUCCESS) {
 +                                      pubsub_discovery_addNode(ps_discovery, 
pubEP);
 +                              }
 +                      } else {
 +                              fw_log(logger, OSGI_FRAMEWORK_LOG_INFO, 
"Unexpected action: %s", action);
 +                      }
 +                      highestModified = modIndex;
 +              } else if (time(NULL) - timeBeforeWatch <= (DEFAULT_ETCD_TTL / 
4)) {
 +                      sleep(DEFAULT_ETCD_TTL / 4);
 +              }
 +
 +              FREE_MEM(action);
 +              FREE_MEM(value);
 +              FREE_MEM(preValue);
 +              FREE_MEM(rkey);
 +
 +              /* prevent busy waiting, in case etcd_watch returns false */
 +
 +
 +              if (time(NULL) - timeBeforeWatch > (DEFAULT_ETCD_TTL / 4)) {
 +                      timeBeforeWatch = time(NULL);
 +              }
 +
 +      }
 +
 +      if (watcher->running == false) {
 +              celixThreadMutex_unlock(&watcher->watcherLock);
 +      }
 +
 +      return NULL;
 +}
 +
 +celix_status_t etcdWatcher_create(pubsub_discovery_pt pubsub_discovery, 
bundle_context_pt context, const char *scope, const char *topic, 
etcd_watcher_pt *watcher) {
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +
 +      if (pubsub_discovery == NULL) {
 +              return CELIX_BUNDLE_EXCEPTION;
 +      }
 +
 +      (*watcher) = calloc(1, sizeof(struct etcd_watcher));
 +
 +      if(*watcher == NULL){
 +              return CELIX_ENOMEM;
 +      }
 +
 +      (*watcher)->pubsub_discovery = pubsub_discovery;
 +      (*watcher)->scope = strdup(scope);
 +      (*watcher)->topic = strdup(topic);
 +
 +
 +      celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
 +
 +      celixThreadMutex_lock(&(*watcher)->watcherLock);
 +
 +      status = celixThread_create(&(*watcher)->watcherThread, NULL, 
etcdWatcher_run, *watcher);
 +      if (status == CELIX_SUCCESS) {
 +              (*watcher)->running = true;
 +      }
 +
 +      celixThreadMutex_unlock(&(*watcher)->watcherLock);
 +
 +
 +      return status;
 +}
 +
 +celix_status_t etcdWatcher_destroy(etcd_watcher_pt watcher) {
 +
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      char rootPath[MAX_ROOTNODE_LENGTH];
 +      etcdWatcher_getTopicRootPath(watcher->pubsub_discovery->context, 
watcher->scope, watcher->topic, rootPath, MAX_ROOTNODE_LENGTH);
 +      celixThreadMutex_destroy(&(watcher->watcherLock));
 +
 +      free(watcher->scope);
 +      free(watcher->topic);
 +      free(watcher);
 +
 +      return status;
 +}
 +
 +celix_status_t etcdWatcher_stop(etcd_watcher_pt watcher){
 +      celix_status_t status = CELIX_SUCCESS;
 +
 +      celixThreadMutex_lock(&(watcher->watcherLock));
 +      watcher->running = false;
 +      celixThreadMutex_unlock(&(watcher->watcherLock));
 +
 +      celixThread_join(watcher->watcherThread, NULL);
 +
 +      return status;
 +
 +}
 +
 +

http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/etcd_writer.c
----------------------------------------------------------------------
diff --cc pubsub/pubsub_discovery/src/etcd_writer.c
index 1c423f3,0000000..e820e50
mode 100644,000000..100644
--- a/pubsub/pubsub_discovery/src/etcd_writer.c
+++ b/pubsub/pubsub_discovery/src/etcd_writer.c
@@@ -1,189 -1,0 +1,221 @@@
 +/**
 + *Licensed to the Apache Software Foundation (ASF) under one
 + *or more contributor license agreements.  See the NOTICE file
 + *distributed with this work for additional information
 + *regarding copyright ownership.  The ASF licenses this file
 + *to you under the Apache License, Version 2.0 (the
 + *"License"); you may not use this file except in compliance
 + *with the License.  You may obtain a copy of the License at
 + *
 + *  http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *Unless required by applicable law or agreed to in writing,
 + *software distributed under the License is distributed on an
 + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + *specific language governing permissions and limitations
 + *under the License.
 + */
 +
 +#include <stdbool.h>
 +#include <stdlib.h>
 +#include <unistd.h>
 +#include <string.h>
++#include <jansson.h>
 +
 +#include "celix_log.h"
 +#include "constants.h"
 +
 +#include "etcd.h"
 +#include "etcd_writer.h"
 +
 +#include "pubsub_discovery.h"
 +#include "pubsub_discovery_impl.h"
 +
 +#define MAX_ROOTNODE_LENGTH           128
 +
 +#define CFG_ETCD_ROOT_PATH            "PUBSUB_DISCOVERY_ETCD_ROOT_PATH"
 +#define DEFAULT_ETCD_ROOTPATH "pubsub/discovery"
 +
 +#define CFG_ETCD_SERVER_IP            "PUBSUB_DISCOVERY_ETCD_SERVER_IP"
 +#define DEFAULT_ETCD_SERVER_IP        "127.0.0.1"
 +
 +#define CFG_ETCD_SERVER_PORT  "PUBSUB_DISCOVERY_ETCD_SERVER_PORT"
 +#define DEFAULT_ETCD_SERVER_PORT 2379
 +
 +// be careful - this should be higher than the curl timeout
 +#define CFG_ETCD_TTL   "DISCOVERY_ETCD_TTL"
 +#define DEFAULT_ETCD_TTL 30
 +
 +struct etcd_writer {
 +      pubsub_discovery_pt pubsub_discovery;
 +      celix_thread_mutex_t localPubsLock;
 +      array_list_pt localPubs;
 +      volatile bool running;
 +      celix_thread_t writerThread;
 +};
 +
 +
 +static const char* etcdWriter_getRootPath(bundle_context_pt context);
 +static void* etcdWriter_run(void* data);
 +
 +
 +etcd_writer_pt etcdWriter_create(pubsub_discovery_pt disc) {
 +      etcd_writer_pt writer = calloc(1, sizeof(*writer));
 +      if(writer) {
 +              celixThreadMutex_create(&writer->localPubsLock, NULL);
 +              arrayList_create(&writer->localPubs);
 +              writer->pubsub_discovery = disc;
 +              writer->running = true;
 +              celixThread_create(&writer->writerThread, NULL, etcdWriter_run, 
writer);
 +      }
 +      return writer;
 +}
 +
 +void etcdWriter_destroy(etcd_writer_pt writer) {
 +      char dir[MAX_ROOTNODE_LENGTH];
 +      const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 +
 +      writer->running = false;
 +      celixThread_join(writer->writerThread, NULL);
 +
 +      celixThreadMutex_lock(&writer->localPubsLock);
 +      for(int i = 0; i < arrayList_size(writer->localPubs); i++) {
 +              pubsub_endpoint_pt pubEP = 
(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i);
 +              memset(dir,0,MAX_ROOTNODE_LENGTH);
-               
snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID);
++              snprintf(dir,MAX_ROOTNODE_LENGTH,"%s/%s/%s/%s",
++                               rootPath,
++                               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
++                               properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
++                               properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID));
++
 +              etcd_del(dir);
 +              pubsubEndpoint_destroy(pubEP);
 +      }
 +      arrayList_destroy(writer->localPubs);
 +
 +      celixThreadMutex_unlock(&writer->localPubsLock);
 +      celixThreadMutex_destroy(&(writer->localPubsLock));
 +
 +      free(writer);
 +}
 +
 +celix_status_t etcdWriter_addPublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP, bool storeEP){
 +      celix_status_t status = CELIX_BUNDLE_EXCEPTION;
 +
 +      if(storeEP){
 +              const char *fwUUID = NULL;
 +              bundleContext_getProperty(writer->pubsub_discovery->context, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID);
-               if(fwUUID && strcmp(pubEP->frameworkUUID, fwUUID) == 0) {
++              if(fwUUID && strcmp(properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) {
 +                      celixThreadMutex_lock(&writer->localPubsLock);
 +                      pubsub_endpoint_pt p = NULL;
 +                      pubsubEndpoint_clone(pubEP, &p);
 +                      arrayList_add(writer->localPubs,p);
 +                      celixThreadMutex_unlock(&writer->localPubsLock);
 +              }
 +      }
 +
 +      char *key;
 +
 +      const char* ttlStr = NULL;
 +      int ttl = 0;
 +
 +      // determine ttl
 +      if ((bundleContext_getProperty(writer->pubsub_discovery->context, 
CFG_ETCD_TTL, &ttlStr) != CELIX_SUCCESS) || !ttlStr) {
 +              ttl = DEFAULT_ETCD_TTL;
 +      } else {
 +              char* endptr = NULL;
 +              errno = 0;
 +              ttl = strtol(ttlStr, &endptr, 10);
 +              if (*endptr || errno != 0) {
 +                      ttl = DEFAULT_ETCD_TTL;
 +              }
 +      }
 +
 +      const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 +
-       
asprintf(&key,"%s/%s/%s/%s/%ld",rootPath,pubEP->scope,pubEP->topic,pubEP->frameworkUUID,pubEP->serviceID);
- 
-       if(!etcd_set(key,pubEP->endpoint,ttl,false)){
++      asprintf(&key,"%s/%s/%s/%s/%ld",
++                       rootPath,
++                       properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
++                       properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
++                       properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                       pubEP->serviceID);
++
++    char serviceID [sizeof(pubEP->serviceID)];
++    snprintf(serviceID, sizeof(pubEP->serviceID), "%ld", pubEP->serviceID);
++      json_t* jsonEndpoint = json_pack("{s:s,s:s,s:s,s:s,s:s,s:s,s:s}",
++                                                                       
PUBSUB_ENDPOINT_SERVICE_ID, serviceID,
++                                                                       
PUBSUB_ENDPOINT_SERIALIZER, "serializer.json", //TODO: Serializer not (yet) 
stored in endpoint
++                                                                       
PUBSUB_ENDPOINT_ADMIN_TYPE, "zmq", //TODO: PSA type not (yet) stored in endpoint
++                                                                       
PUBSUB_ENDPOINT_URL, properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL),
++                                                                       
PUBSUB_ENDPOINT_TYPE, "publisher", //TODO: Check if necessary
++                                                                       
PUBSUB_ENDPOINT_TOPIC, properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
++                                                                       
PUBSUB_ENDPOINT_SCOPE, properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE)
++      );
++      char* jsonEndpointStr = json_dumps(jsonEndpoint, JSON_COMPACT);
++
++      if (!etcd_set(key,jsonEndpointStr,ttl,false)) {
 +              status = CELIX_ILLEGAL_ARGUMENT;
 +      }
 +      FREE_MEM(key);
++      FREE_MEM(jsonEndpointStr);
++      json_decref(jsonEndpoint);
++
 +      return status;
 +}
 +
 +celix_status_t etcdWriter_deletePublisherEndpoint(etcd_writer_pt writer, 
pubsub_endpoint_pt pubEP) {
 +      celix_status_t status = CELIX_SUCCESS;
 +      char *key = NULL;
 +
 +      const char *rootPath = 
etcdWriter_getRootPath(writer->pubsub_discovery->context);
 +
-       asprintf(&key, "%s/%s/%s/%s/%ld", rootPath, pubEP->scope, pubEP->topic, 
pubEP->frameworkUUID, pubEP->serviceID);
++      asprintf(&key, "%s/%s/%s/%s/%ld",
++                       rootPath,
++                       properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_SCOPE),
++                       properties_get(pubEP->endpoint_props, 
PUBSUB_ENDPOINT_TOPIC),
++                       properties_get(pubEP->endpoint_props, 
OSGI_FRAMEWORK_FRAMEWORK_UUID),
++                       pubEP->serviceID);
 +
 +      celixThreadMutex_lock(&writer->localPubsLock);
 +      for (unsigned int i = 0; i < arrayList_size(writer->localPubs); i++) {
 +              pubsub_endpoint_pt ep = arrayList_get(writer->localPubs, i);
 +              if (pubsubEndpoint_equals(ep, pubEP)) {
 +                      arrayList_remove(writer->localPubs, i);
 +                      pubsubEndpoint_destroy(ep);
 +                      break;
 +              }
 +      }
 +      celixThreadMutex_unlock(&writer->localPubsLock);
 +
 +      if (etcd_del(key)) {
 +              printf("Failed to remove key %s from ETCD\n",key);
 +              status = CELIX_ILLEGAL_ARGUMENT;
 +      }
 +      FREE_MEM(key);
 +      return status;
 +}
 +
 +static void* etcdWriter_run(void* data) {
 +      etcd_writer_pt writer = (etcd_writer_pt)data;
 +      while(writer->running) {
 +              celixThreadMutex_lock(&writer->localPubsLock);
 +              for(int i=0; i < arrayList_size(writer->localPubs); i++) {
 +                      
etcdWriter_addPublisherEndpoint(writer,(pubsub_endpoint_pt)arrayList_get(writer->localPubs,i),false);
 +              }
 +              celixThreadMutex_unlock(&writer->localPubsLock);
 +              sleep(DEFAULT_ETCD_TTL / 2);
 +      }
 +
 +      return NULL;
 +}
 +
 +static const char* etcdWriter_getRootPath(bundle_context_pt context) {
 +      const char* rootPath = NULL;
 +      bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath);
 +      if(rootPath == NULL) {
 +              rootPath = DEFAULT_ETCD_ROOTPATH;
 +      }
 +      return rootPath;
 +}
 +

Reply via email to