http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_discovery/src/pubsub_discovery_impl.c index 94a8e11,0000000..e3e9704 mode 100644,000000..100644 --- a/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c +++ b/pubsub/pubsub_discovery/src/pubsub_discovery_impl.c @@@ -1,457 -1,0 +1,460 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <stdbool.h> +#include <netdb.h> +#include <netinet/in.h> + +#include "constants.h" +#include "celix_threads.h" +#include "bundle_context.h" +#include "array_list.h" +#include "utils.h" +#include "celix_errno.h" +#include "filter.h" +#include "service_reference.h" +#include "service_registration.h" + +#include "publisher_endpoint_announce.h" +#include "etcd_common.h" +#include "etcd_watcher.h" +#include "etcd_writer.h" +#include "pubsub_endpoint.h" +#include "pubsub_discovery_impl.h" + +/* Discovery activator functions */ +celix_status_t pubsub_discovery_create(bundle_context_pt context, pubsub_discovery_pt *ps_discovery) { + celix_status_t status = CELIX_SUCCESS; + + *ps_discovery = calloc(1, sizeof(**ps_discovery)); + + if (*ps_discovery == NULL) { + status = CELIX_ENOMEM; + } + else{ + (*ps_discovery)->context = context; + (*ps_discovery)->discoveredPubs = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*ps_discovery)->listenerReferences = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); + (*ps_discovery)->watchers = hashMap_create(utils_stringHash,NULL,utils_stringEquals, NULL); + celixThreadMutex_create(&(*ps_discovery)->listenerReferencesMutex, NULL); + celixThreadMutex_create(&(*ps_discovery)->discoveredPubsMutex, NULL); + celixThreadMutex_create(&(*ps_discovery)->watchersMutex, NULL); + } + + return status; +} + +celix_status_t pubsub_discovery_destroy(pubsub_discovery_pt ps_discovery) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); + + hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->discoveredPubs); + + while (hashMapIterator_hasNext(iter)) { + array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter); + + for(int i=0; i < arrayList_size(pubEP_list); i++) { + pubsubEndpoint_destroy(((pubsub_endpoint_pt)arrayList_get(pubEP_list,i))); + } + arrayList_destroy(pubEP_list); + } + + hashMapIterator_destroy(iter); + + hashMap_destroy(ps_discovery->discoveredPubs, true, false); + ps_discovery->discoveredPubs = NULL; + + celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex); + + celixThreadMutex_destroy(&ps_discovery->discoveredPubsMutex); + + + celixThreadMutex_lock(&ps_discovery->listenerReferencesMutex); + + hashMap_destroy(ps_discovery->listenerReferences, false, false); + ps_discovery->listenerReferences = NULL; + + celixThreadMutex_unlock(&ps_discovery->listenerReferencesMutex); + + celixThreadMutex_destroy(&ps_discovery->listenerReferencesMutex); + + free(ps_discovery); + + return status; +} + +celix_status_t pubsub_discovery_start(pubsub_discovery_pt ps_discovery) { + celix_status_t status = CELIX_SUCCESS; + status = etcdCommon_init(ps_discovery->context); + ps_discovery->writer = etcdWriter_create(ps_discovery); + + return status; +} + +celix_status_t pubsub_discovery_stop(pubsub_discovery_pt ps_discovery) { + celix_status_t status = CELIX_SUCCESS; + + const char* fwUUID = NULL; + + bundleContext_getProperty(ps_discovery->context, OSGI_FRAMEWORK_FRAMEWORK_UUID, &fwUUID); + if (fwUUID == NULL) { + printf("PSD: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + + celixThreadMutex_lock(&ps_discovery->watchersMutex); + + hash_map_iterator_pt iter = hashMapIterator_create(ps_discovery->watchers); + while (hashMapIterator_hasNext(iter)) { + struct watcher_info * wi = hashMapIterator_nextValue(iter); + etcdWatcher_stop(wi->watcher); + } + hashMapIterator_destroy(iter); + + celixThreadMutex_lock(&ps_discovery->discoveredPubsMutex); + + /* Unexport all publishers for the local framework, and also delete from ETCD publisher belonging to the local framework */ + + iter = hashMapIterator_create(ps_discovery->discoveredPubs); + while (hashMapIterator_hasNext(iter)) { + array_list_pt pubEP_list = (array_list_pt) hashMapIterator_nextValue(iter); + + int i; + for (i = 0; i < arrayList_size(pubEP_list); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt) arrayList_get(pubEP_list, i); - if (strcmp(pubEP->frameworkUUID, fwUUID) == 0) { ++ if (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), fwUUID) == 0) { + etcdWriter_deletePublisherEndpoint(ps_discovery->writer, pubEP); + } else { + pubsub_discovery_informPublishersListeners(ps_discovery, pubEP, false); + arrayList_remove(pubEP_list, i); + pubsubEndpoint_destroy(pubEP); + i--; + } + } + } + + hashMapIterator_destroy(iter); + + celixThreadMutex_unlock(&ps_discovery->discoveredPubsMutex); + etcdWriter_destroy(ps_discovery->writer); + + iter = hashMapIterator_create(ps_discovery->watchers); + while (hashMapIterator_hasNext(iter)) { + struct watcher_info * wi = hashMapIterator_nextValue(iter); + etcdWatcher_destroy(wi->watcher); + } + hashMapIterator_destroy(iter); + hashMap_destroy(ps_discovery->watchers, true, true); + celixThreadMutex_unlock(&ps_discovery->watchersMutex); + return status; +} + +/* Functions called by the etcd_watcher */ + +celix_status_t pubsub_discovery_addNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + bool inform=false; + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + - char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic); ++ char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pubs_key); + if(pubEP_list==NULL){ + arrayList_create(&pubEP_list); + arrayList_add(pubEP_list,pubEP); + hashMap_put(pubsub_discovery->discoveredPubs,strdup(pubs_key),pubEP_list); + inform=true; + } + else{ + int i; + bool found = false; + for(i=0;i<arrayList_size(pubEP_list) && !found;i++){ + found = pubsubEndpoint_equals(pubEP,(pubsub_endpoint_pt)arrayList_get(pubEP_list,i)); + } + if(found){ + pubsubEndpoint_destroy(pubEP); + } + else{ + arrayList_add(pubEP_list,pubEP); + inform=true; + } + } + free(pubs_key); + + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + + if(inform){ + status = pubsub_discovery_informPublishersListeners(pubsub_discovery,pubEP,true); + } + + return status; +} + +celix_status_t pubsub_discovery_removeNode(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + pubsub_endpoint_pt p = NULL; + bool found = false; + + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); - char *pubs_key = createScopeTopicKey(pubEP->scope, pubEP->topic); ++ char *pubs_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pubEP_list = (array_list_pt) hashMap_get(pubsub_discovery->discoveredPubs, pubs_key); + free(pubs_key); + if (pubEP_list == NULL) { - printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", pubEP->topic); ++ printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + status = CELIX_ILLEGAL_STATE; + } else { + int i; + + for (i = 0; !found && i < arrayList_size(pubEP_list); i++) { + p = arrayList_get(pubEP_list, i); + found = pubsubEndpoint_equals(pubEP, p); + if (found) { + arrayList_remove(pubEP_list, i); + pubsubEndpoint_destroy(p); + } + } + } + + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + if (found) { + status = pubsub_discovery_informPublishersListeners(pubsub_discovery, pubEP, false); + } + pubsubEndpoint_destroy(pubEP); + + return status; +} + +/* Callback to the pubsub_topology_manager */ +celix_status_t pubsub_discovery_informPublishersListeners(pubsub_discovery_pt pubsub_discovery, pubsub_endpoint_pt pubEP, bool epAdded) { + celix_status_t status = CELIX_SUCCESS; + + // Inform listeners of new publisher endpoint + celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); + + if (pubsub_discovery->listenerReferences != NULL) { + hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->listenerReferences); + while (hashMapIterator_hasNext(iter)) { + service_reference_pt reference = hashMapIterator_nextKey(iter); + + publisher_endpoint_announce_pt listener = NULL; + + bundleContext_getService(pubsub_discovery->context, reference, (void**) &listener); + if (epAdded) { + listener->announcePublisher(listener->handle, pubEP); + } else { + listener->removePublisher(listener->handle, pubEP); + } + bundleContext_ungetService(pubsub_discovery->context, reference, NULL); + } + hashMapIterator_destroy(iter); + } + + celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); + + return status; +} + + +/* Service's functions implementation */ +celix_status_t pubsub_discovery_announcePublisher(void *handle, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; - printf("pubsub_discovery_announcePublisher : %s / %s\n", pubEP->topic, pubEP->endpoint); ++ printf("pubsub_discovery_announcePublisher : %s / %s\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + - char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic); ++ char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); + + if(pubEP_list==NULL){ + arrayList_create(&pubEP_list); + hashMap_put(pubsub_discovery->discoveredPubs,strdup(pub_key),pubEP_list); + } + free(pub_key); + pubsub_endpoint_pt p = NULL; + pubsubEndpoint_clone(pubEP, &p); + + arrayList_add(pubEP_list,p); + + status = etcdWriter_addPublisherEndpoint(pubsub_discovery->writer,p,true); + + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + + return status; +} + +celix_status_t pubsub_discovery_removePublisher(void *handle, pubsub_endpoint_pt pubEP) { + celix_status_t status = CELIX_SUCCESS; + + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + - char *pub_key = createScopeTopicKey(pubEP->scope,pubEP->topic); ++ char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pubEP_list = (array_list_pt)hashMap_get(pubsub_discovery->discoveredPubs,pub_key); + free(pub_key); + if(pubEP_list==NULL){ - printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",pubEP->topic); ++ printf("PSD: Cannot find any registered publisher for topic %s. Something is not consistent.\n",properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + status = CELIX_ILLEGAL_STATE; + } + else{ + + int i; + bool found = false; + pubsub_endpoint_pt p = NULL; + + for(i=0;!found && i<arrayList_size(pubEP_list);i++){ + p = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); + found = pubsubEndpoint_equals(pubEP,p); + } + + if(!found){ + printf("PSD: Trying to remove a not existing endpoint. Something is not consistent.\n"); + status = CELIX_ILLEGAL_STATE; + } + else{ + + arrayList_removeElement(pubEP_list,p); + + status = etcdWriter_deletePublisherEndpoint(pubsub_discovery->writer,p); + + pubsubEndpoint_destroy(p); + } + } + + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + + return status; +} + +celix_status_t pubsub_discovery_interestedInTopic(void *handle, const char* scope, const char* topic) { + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + + char *scope_topic_key = createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&pubsub_discovery->watchersMutex); + struct watcher_info * wi = hashMap_get(pubsub_discovery->watchers, scope_topic_key); + if(wi) { + wi->nr_references++; + free(scope_topic_key); + } else { + wi = calloc(1, sizeof(*wi)); + etcdWatcher_create(pubsub_discovery, pubsub_discovery->context, scope, topic, &wi->watcher); + wi->nr_references = 1; + hashMap_put(pubsub_discovery->watchers, scope_topic_key, wi); + } + + celixThreadMutex_unlock(&pubsub_discovery->watchersMutex); + + return CELIX_SUCCESS; +} + +celix_status_t pubsub_discovery_uninterestedInTopic(void *handle, const char* scope, const char* topic) { + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt) handle; + + char *scope_topic_key = createScopeTopicKey(scope, topic); + celixThreadMutex_lock(&pubsub_discovery->watchersMutex); + + hash_map_entry_pt entry = hashMap_getEntry(pubsub_discovery->watchers, scope_topic_key); + if(entry) { + struct watcher_info * wi = hashMapEntry_getValue(entry); + wi->nr_references--; + if(wi->nr_references == 0) { + char *key = hashMapEntry_getKey(entry); + hashMap_remove(pubsub_discovery->watchers, scope_topic_key); + free(key); + free(scope_topic_key); + etcdWatcher_stop(wi->watcher); + etcdWatcher_destroy(wi->watcher); + free(wi); + } + } else { + fprintf(stderr, "[DISC] Inconsistency error: Removing unknown topic %s\n", topic); + } + celixThreadMutex_unlock(&pubsub_discovery->watchersMutex); + return CELIX_SUCCESS; +} + +/* pubsub_topology_manager tracker callbacks */ + +celix_status_t pubsub_discovery_tmPublisherAnnounceAdded(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + + pubsub_discovery_pt pubsub_discovery = (pubsub_discovery_pt)handle; + publisher_endpoint_announce_pt listener = (publisher_endpoint_announce_pt)service; + + celixThreadMutex_lock(&pubsub_discovery->discoveredPubsMutex); + celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); + + /* Notify the PSTM about discovered publisher endpoints */ + hash_map_iterator_pt iter = hashMapIterator_create(pubsub_discovery->discoveredPubs); + while(hashMapIterator_hasNext(iter)){ + array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); + int i; + for(i=0;i<arrayList_size(pubEP_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); + status += listener->announcePublisher(listener->handle, pubEP); + } + } + + hashMapIterator_destroy(iter); + + hashMap_put(pubsub_discovery->listenerReferences, reference, NULL); + + celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); + celixThreadMutex_unlock(&pubsub_discovery->discoveredPubsMutex); + + printf("PSD: pubsub_tm_announce_publisher added.\n"); + + return status; +} + +celix_status_t pubsub_discovery_tmPublisherAnnounceModified(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + + status = pubsub_discovery_tmPublisherAnnounceRemoved(handle, reference, service); + if (status == CELIX_SUCCESS) { + status = pubsub_discovery_tmPublisherAnnounceAdded(handle, reference, service); + } + + return status; +} + +celix_status_t pubsub_discovery_tmPublisherAnnounceRemoved(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + pubsub_discovery_pt pubsub_discovery = handle; + + celixThreadMutex_lock(&pubsub_discovery->listenerReferencesMutex); + + if (pubsub_discovery->listenerReferences != NULL) { + if (hashMap_remove(pubsub_discovery->listenerReferences, reference)) { + printf("PSD: pubsub_tm_announce_publisher removed.\n"); + } + } + celixThreadMutex_unlock(&pubsub_discovery->listenerReferencesMutex); + + return status; +} +
http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_spi/include/pubsub_endpoint.h ---------------------------------------------------------------------- diff --cc pubsub/pubsub_spi/include/pubsub_endpoint.h index 4c39d2f,0000000..598d673 mode 100644,000000..100644 --- a/pubsub/pubsub_spi/include/pubsub_endpoint.h +++ b/pubsub/pubsub_spi/include/pubsub_endpoint.h @@@ -1,58 -1,0 +1,65 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_endpoint.h + * + * \date Sep 21, 2015 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PUBSUB_ENDPOINT_H_ +#define PUBSUB_ENDPOINT_H_ + +#include "service_reference.h" +#include "listener_hook_service.h" +#include "properties.h" + +#include "pubsub/publisher.h" +#include "pubsub/subscriber.h" + ++#define PUBSUB_ENDPOINT_ID "pubsub.endpoint.id" ++#define PUBSUB_ENDPOINT_SERVICE_ID "service.id" ++#define PUBSUB_ENDPOINT_SERIALIZER "serializer" ++#define PUBSUB_ENDPOINT_ADMIN_TYPE "pubsub.admin.type" ++#define PUBSUB_ENDPOINT_URL "pubsub.endpoint" ++#define PUBSUB_ENDPOINT_TOPIC "pubsub.topic" ++#define PUBSUB_ENDPOINT_SCOPE "pubsub.scope" ++#define PUBSUB_ENDPOINT_TYPE "pubsub.type" ++ +struct pubsub_endpoint { - char *frameworkUUID; - char *scope; - char *topic; - long serviceID; - char* endpoint; - bool is_secure; ++ long serviceID; //optional ++ bool is_secure; //optional ++ properties_pt endpoint_props; + properties_pt topic_props; +}; + +typedef struct pubsub_endpoint *pubsub_endpoint_pt; + +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp); +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher); +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out); +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp); +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2); ++celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value); + +char *createScopeTopicKey(const char* scope, const char* topic); + +#endif /* PUBSUB_ENDPOINT_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_spi/src/pubsub_endpoint.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_spi/src/pubsub_endpoint.c index c3fd293,0000000..d3b746e mode 100644,000000..100644 --- a/pubsub/pubsub_spi/src/pubsub_endpoint.c +++ b/pubsub/pubsub_spi/src/pubsub_endpoint.c @@@ -1,254 -1,0 +1,282 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * endpoint_description.c + * + * \date 25 Jul 2014 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#include <string.h> +#include <stdlib.h> ++#include <uuid/uuid.h> + +#include "celix_errno.h" +#include "celix_log.h" + +#include "pubsub_common.h" +#include "pubsub_endpoint.h" +#include "constants.h" + +#include "pubsub_utils.h" + + +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps); +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher); + +static void pubsubEndpoint_setFields(pubsub_endpoint_pt psEp, const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props, bool cloneProps){ + ++ if (psEp->endpoint_props == NULL) { ++ psEp->endpoint_props = properties_create(); ++ } ++ ++ char endpointUuid[37]; ++ ++ uuid_t endpointUid; ++ uuid_generate(endpointUid); ++ uuid_unparse(endpointUid, endpointUuid); ++ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_ID, endpointUuid); ++ + if (fwUUID != NULL) { - psEp->frameworkUUID = strdup(fwUUID); ++ properties_set(psEp->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID, fwUUID); + } + + if (scope != NULL) { - psEp->scope = strdup(scope); ++ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE, scope); + } + + if (topic != NULL) { - psEp->topic = strdup(topic); ++ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC, topic); + } + + psEp->serviceID = serviceId; + + if(endpoint != NULL) { - psEp->endpoint = strdup(endpoint); ++ properties_set(psEp->endpoint_props, PUBSUB_ENDPOINT_URL, endpoint); + } + + if(topic_props != NULL){ + if(cloneProps){ + properties_copy(topic_props, &(psEp->topic_props)); + } + else{ + psEp->topic_props = topic_props; + } + } +} + +static properties_pt pubsubEndpoint_getTopicProperties(bundle_pt bundle, const char *topic, bool isPublisher){ + + properties_pt topic_props = NULL; + + bool isSystemBundle = false; + bundle_isSystemBundle(bundle, &isSystemBundle); + long bundleId = -1; + bundle_isSystemBundle(bundle, &isSystemBundle); + bundle_getBundleId(bundle,&bundleId); + + if(isSystemBundle == false) { + + char *bundleRoot = NULL; + char* topicPropertiesPath = NULL; + bundle_getEntry(bundle, ".", &bundleRoot); + + if(bundleRoot != NULL){ + + asprintf(&topicPropertiesPath, "%s/META-INF/topics/%s/%s.properties", bundleRoot, isPublisher?"pub":"sub", topic); + topic_props = properties_load(topicPropertiesPath); + if(topic_props==NULL){ + printf("PSEP: Could not load properties for %s on topic %s, bundleId=%ld\n", isPublisher?"publication":"subscription", topic,bundleId); + } + + free(topicPropertiesPath); + free(bundleRoot); + } + } + + return topic_props; +} + ++celix_status_t pubsubEndpoint_setField(pubsub_endpoint_pt ep, const char* key, const char* value) { ++ celix_status_t status = CELIX_SUCCESS; ++ ++ if (ep->endpoint_props == NULL) { ++ printf("PUBSUB_EP: No endpoint_props for endpoint available!\n"); ++ return CELIX_ILLEGAL_STATE; ++ } ++ ++ if (key != NULL && value != NULL) { ++ properties_set(ep->endpoint_props, key, value); ++ } ++ ++ return status; ++} ++ +celix_status_t pubsubEndpoint_create(const char* fwUUID, const char* scope, const char* topic, long serviceId,const char* endpoint,properties_pt topic_props,pubsub_endpoint_pt* psEp){ + celix_status_t status = CELIX_SUCCESS; + + *psEp = calloc(1, sizeof(**psEp)); + + pubsubEndpoint_setFields(*psEp, fwUUID, scope, topic, serviceId, endpoint, topic_props, true); + + return status; + +} + +celix_status_t pubsubEndpoint_clone(pubsub_endpoint_pt in, pubsub_endpoint_pt *out){ + celix_status_t status = CELIX_SUCCESS; + - *out = calloc(1,sizeof(**out)); ++ pubsub_endpoint_pt ep = calloc(1,sizeof(*ep)); + - pubsubEndpoint_setFields(*out, in->frameworkUUID, in->scope, in->topic, in->serviceID, in->endpoint, in->topic_props, true); ++ status = properties_copy(in->endpoint_props, &(ep->endpoint_props)); ++ ++ if (in->topic_props != NULL) { ++ status += properties_copy(in->topic_props, &(ep->topic_props)); ++ } ++ ++ ep->serviceID = in->serviceID; ++ ep->is_secure = in->is_secure; ++ ++ if (status == CELIX_SUCCESS) { ++ *out = ep; ++ } else { ++ pubsubEndpoint_destroy(ep); ++ } + + return status; + +} + +celix_status_t pubsubEndpoint_createFromServiceReference(service_reference_pt reference, pubsub_endpoint_pt* psEp, bool isPublisher){ + celix_status_t status = CELIX_SUCCESS; + + pubsub_endpoint_pt ep = calloc(1,sizeof(*ep)); + + bundle_pt bundle = NULL; + bundle_context_pt ctxt = NULL; + const char* fwUUID = NULL; + serviceReference_getBundle(reference,&bundle); + bundle_getContext(bundle,&ctxt); + bundleContext_getProperty(ctxt,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + + const char* scope = NULL; + serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_SCOPE,&scope); + + const char* topic = NULL; + serviceReference_getProperty(reference, PUBSUB_SUBSCRIBER_TOPIC,&topic); + + const char* serviceId = NULL; + serviceReference_getProperty(reference,(char*)OSGI_FRAMEWORK_SERVICE_ID,&serviceId); + + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ + properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); + + pubsubEndpoint_setFields(ep, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, strtol(serviceId,NULL,10), NULL, topic_props, false); + - if (!ep->frameworkUUID || !ep->serviceID || !ep->scope || !ep->topic) { ++ if (!properties_get(ep->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID) || ++ !ep->serviceID || ++ !properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_SCOPE) || ++ !properties_get(ep->endpoint_props, PUBSUB_ENDPOINT_TOPIC)) { ++ + fw_log(logger, OSGI_FRAMEWORK_LOG_ERROR, "PUBSUB_ENDPOINT: incomplete description!."); + status = CELIX_BUNDLE_EXCEPTION; + pubsubEndpoint_destroy(ep); + *psEp = NULL; + } + else{ + *psEp = ep; + } + + return status; + +} + +celix_status_t pubsubEndpoint_createFromListenerHookInfo(listener_hook_info_pt info,pubsub_endpoint_pt* psEp, bool isPublisher){ + celix_status_t status = CELIX_SUCCESS; + + const char* fwUUID=NULL; + bundleContext_getProperty(info->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + + if(fwUUID==NULL){ + return CELIX_BUNDLE_EXCEPTION; + } + + char* topic = pubsub_getTopicFromFilter(info->filter); + if(topic==NULL){ + return CELIX_BUNDLE_EXCEPTION; + } + + *psEp = calloc(1, sizeof(**psEp)); + + char* scope = pubsub_getScopeFromFilter(info->filter); + if(scope == NULL) { + scope = strdup(PUBSUB_PUBLISHER_SCOPE_DEFAULT); + } + + bundle_pt bundle = NULL; + long bundleId = -1; + bundleContext_getBundle(info->context,&bundle); + + bundle_getBundleId(bundle,&bundleId); + + properties_pt topic_props = pubsubEndpoint_getTopicProperties(bundle, topic, isPublisher); + + /* TODO: is topic_props==NULL a fatal error such that EP cannot be created? */ + pubsubEndpoint_setFields(*psEp, fwUUID, scope!=NULL?scope:PUBSUB_SUBSCRIBER_SCOPE_DEFAULT, topic, bundleId, NULL, topic_props, false); + + free(topic); + free(scope); + + + return status; +} + +celix_status_t pubsubEndpoint_destroy(pubsub_endpoint_pt psEp){ + - if(psEp->frameworkUUID!=NULL){ - free(psEp->frameworkUUID); - psEp->frameworkUUID = NULL; - } - - if(psEp->scope!=NULL){ - free(psEp->scope); - psEp->scope = NULL; - } - - if(psEp->topic!=NULL){ - free(psEp->topic); - psEp->topic = NULL; - } - - if(psEp->endpoint!=NULL){ - free(psEp->endpoint); - psEp->endpoint = NULL; - } - + if(psEp->topic_props != NULL){ + properties_destroy(psEp->topic_props); + } + ++ if (psEp->endpoint_props != NULL) { ++ properties_destroy(psEp->endpoint_props); ++ } ++ + free(psEp); + + return CELIX_SUCCESS; + +} + +bool pubsubEndpoint_equals(pubsub_endpoint_pt psEp1,pubsub_endpoint_pt psEp2){ + - return ((strcmp(psEp1->frameworkUUID,psEp2->frameworkUUID)==0) && - (strcmp(psEp1->scope,psEp2->scope)==0) && - (strcmp(psEp1->topic,psEp2->topic)==0) && ++ return ((strcmp(properties_get(psEp1->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(psEp2->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID))==0) && ++ (strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_SCOPE))==0) && ++ (strcmp(properties_get(psEp1->endpoint_props, PUBSUB_ENDPOINT_TOPIC),properties_get(psEp2->endpoint_props, PUBSUB_ENDPOINT_TOPIC))==0) && + (psEp1->serviceID == psEp2->serviceID) /*&& + ((psEp1->endpoint==NULL && psEp2->endpoint==NULL)||(strcmp(psEp1->endpoint,psEp2->endpoint)==0))*/ + ); +} + +char *createScopeTopicKey(const char* scope, const char* topic) { + char *result = NULL; + asprintf(&result, "%s:%s", scope, topic); + + return result; +} http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c ---------------------------------------------------------------------- diff --cc pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c index 2ac75c9,0000000..a63b275 mode 100644,000000..100644 --- a/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c +++ b/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c @@@ -1,721 -1,0 +1,727 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * pubsub_topology_manager.c + * + * \date Sep 29, 2011 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <stdbool.h> + +#include "hash_map.h" +#include "array_list.h" +#include "bundle_context.h" +#include "constants.h" +#include "module.h" +#include "bundle.h" +#include "filter.h" +#include "listener_hook_service.h" +#include "utils.h" +#include "service_reference.h" +#include "service_registration.h" +#include "log_service.h" +#include "log_helper.h" + +#include "publisher_endpoint_announce.h" +#include "pubsub_topology_manager.h" +#include "pubsub_endpoint.h" +#include "pubsub_admin.h" +#include "pubsub_utils.h" + + +celix_status_t pubsub_topologyManager_create(bundle_context_pt context, log_helper_pt logHelper, pubsub_topology_manager_pt *manager) { + celix_status_t status = CELIX_SUCCESS; + + *manager = calloc(1, sizeof(**manager)); + if (!*manager) { + return CELIX_ENOMEM; + } + + (*manager)->context = context; + + celix_thread_mutexattr_t psaAttr; + celixThreadMutexAttr_create(&psaAttr); + celixThreadMutexAttr_settype(&psaAttr, CELIX_THREAD_MUTEX_RECURSIVE); + status = celixThreadMutex_create(&(*manager)->psaListLock, &psaAttr); + celixThreadMutexAttr_destroy(&psaAttr); + + status = celixThreadMutex_create(&(*manager)->publicationsLock, NULL); + status = celixThreadMutex_create(&(*manager)->subscriptionsLock, NULL); + status = celixThreadMutex_create(&(*manager)->discoveryListLock, NULL); + + arrayList_create(&(*manager)->psaList); + + (*manager)->discoveryList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); + (*manager)->publications = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + (*manager)->subscriptions = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); + + (*manager)->loghelper = logHelper; + + return status; +} + +celix_status_t pubsub_topologyManager_destroy(pubsub_topology_manager_pt manager) { + celix_status_t status = CELIX_SUCCESS; + + celixThreadMutex_lock(&manager->discoveryListLock); + hashMap_destroy(manager->discoveryList, false, false); + celixThreadMutex_unlock(&manager->discoveryListLock); + celixThreadMutex_destroy(&manager->discoveryListLock); + + celixThreadMutex_lock(&manager->psaListLock); + arrayList_destroy(manager->psaList); + celixThreadMutex_unlock(&manager->psaListLock); + celixThreadMutex_destroy(&manager->psaListLock); + + celixThreadMutex_lock(&manager->publicationsLock); + hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); + while(hashMapIterator_hasNext(pubit)){ + array_list_pt l = (array_list_pt)hashMapIterator_nextValue(pubit); + int i; + for(i=0;i<arrayList_size(l);i++){ + pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); + } + arrayList_destroy(l); + } + hashMapIterator_destroy(pubit); + hashMap_destroy(manager->publications, true, false); + celixThreadMutex_unlock(&manager->publicationsLock); + celixThreadMutex_destroy(&manager->publicationsLock); + + celixThreadMutex_lock(&manager->subscriptionsLock); + hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); + while(hashMapIterator_hasNext(subit)){ + array_list_pt l = (array_list_pt)hashMapIterator_nextValue(subit); + int i; + for(i=0;i<arrayList_size(l);i++){ + pubsubEndpoint_destroy((pubsub_endpoint_pt)arrayList_get(l,i)); + } + arrayList_destroy(l); + } + hashMapIterator_destroy(subit); + hashMap_destroy(manager->subscriptions, true, false); + celixThreadMutex_unlock(&manager->subscriptionsLock); + celixThreadMutex_destroy(&manager->subscriptionsLock); + + free(manager); + + return status; +} + +celix_status_t pubsub_topologyManager_psaAdded(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = handle; + int i; + + pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service; + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Added PSA"); + + celixThreadMutex_lock(&manager->psaListLock); + arrayList_add(manager->psaList, psa); + celixThreadMutex_unlock(&manager->psaListLock); + + // Add already detected subscriptions to new PSA + celixThreadMutex_lock(&manager->subscriptionsLock); + hash_map_iterator_pt subscriptionsIterator = hashMapIterator_create(manager->subscriptions); + + while (hashMapIterator_hasNext(subscriptionsIterator)) { + array_list_pt sub_ep_list = hashMapIterator_nextValue(subscriptionsIterator); + for(i=0;i<arrayList_size(sub_ep_list);i++){ + status += psa->addSubscription(psa->admin, (pubsub_endpoint_pt)arrayList_get(sub_ep_list,i)); + } + } + + hashMapIterator_destroy(subscriptionsIterator); + + celixThreadMutex_unlock(&manager->subscriptionsLock); + + // Add already detected publications to new PSA + status = celixThreadMutex_lock(&manager->publicationsLock); + hash_map_iterator_pt publicationsIterator = hashMapIterator_create(manager->publications); + + while (hashMapIterator_hasNext(publicationsIterator)) { + array_list_pt pub_ep_list = hashMapIterator_nextValue(publicationsIterator); + for(i=0;i<arrayList_size(pub_ep_list);i++){ + status += psa->addPublication(psa->admin, (pubsub_endpoint_pt)arrayList_get(pub_ep_list,i)); + } + } + + hashMapIterator_destroy(publicationsIterator); + + celixThreadMutex_unlock(&manager->publicationsLock); + + return status; +} + +celix_status_t pubsub_topologyManager_psaModified(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + + // Nop... + + return status; +} + +celix_status_t pubsub_topologyManager_psaRemoved(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = handle; + + pubsub_admin_service_pt psa = (pubsub_admin_service_pt) service; + + /* Deactivate all publications */ + celixThreadMutex_lock(&manager->publicationsLock); + + hash_map_iterator_pt pubit = hashMapIterator_create(manager->publications); + while(hashMapIterator_hasNext(pubit)){ + hash_map_entry_pt pub_entry = hashMapIterator_nextEntry(pubit); + char* scope_topic_key = (char*)hashMapEntry_getKey(pub_entry); + // Extract scope/topic name from key + char scope[MAX_SCOPE_LEN]; + char topic[MAX_TOPIC_LEN]; + sscanf(scope_topic_key, "%[^:]:%s", scope, topic ); + array_list_pt pubEP_list = (array_list_pt)hashMapEntry_getValue(pub_entry); + + status = psa->closeAllPublications(psa->admin,scope,topic); + + if(status==CELIX_SUCCESS){ + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + const char* fwUUID = NULL; + bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + int i; + for(i=0;i<arrayList_size(pubEP_list);i++){ + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if(strcmp(pubEP->frameworkUUID,fwUUID)==0){ ++ if(strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0){ + disc->removePublisher(disc->handle,pubEP); + } + } + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); + } + } + hashMapIterator_destroy(pubit); + + celixThreadMutex_unlock(&manager->publicationsLock); + + /* Deactivate all subscriptions */ + celixThreadMutex_lock(&manager->subscriptionsLock); + hash_map_iterator_pt subit = hashMapIterator_create(manager->subscriptions); + while(hashMapIterator_hasNext(subit)){ + // TODO do some error checking + char* scope_topic = (char*)hashMapIterator_nextKey(subit); + char scope[MAX_TOPIC_LEN]; + char topic[MAX_TOPIC_LEN]; + memset(scope, 0 , MAX_TOPIC_LEN*sizeof(char)); + memset(topic, 0 , MAX_TOPIC_LEN*sizeof(char)); + sscanf(scope_topic, "%[^:]:%s", scope, topic ); + status += psa->closeAllSubscriptions(psa->admin,scope, topic); + } + hashMapIterator_destroy(subit); + celixThreadMutex_unlock(&manager->subscriptionsLock); + + celixThreadMutex_lock(&manager->psaListLock); + arrayList_removeElement(manager->psaList, psa); + celixThreadMutex_unlock(&manager->psaListLock); + + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "PSTM: Removed PSA"); + + return status; +} + +celix_status_t pubsub_topologyManager_subscriberAdded(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = handle; + //subscriber_service_pt subscriber = (subscriber_service_pt)service; + + pubsub_endpoint_pt sub = NULL; + if(pubsubEndpoint_createFromServiceReference(reference,&sub,false) == CELIX_SUCCESS){ + celixThreadMutex_lock(&manager->subscriptionsLock); - char *sub_key = createScopeTopicKey(sub->scope, sub->topic); ++ char *sub_key = createScopeTopicKey(properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); + if(sub_list_by_topic==NULL){ + arrayList_create(&sub_list_by_topic); + hashMap_put(manager->subscriptions,strdup(sub_key),sub_list_by_topic); + } + free(sub_key); + arrayList_add(sub_list_by_topic,sub); + + celixThreadMutex_unlock(&manager->subscriptionsLock); + + int j; + double score = 0; + double best_score = 0; + pubsub_admin_service_pt best_psa = NULL; + celixThreadMutex_lock(&manager->psaListLock); + for(j=0;j<arrayList_size(manager->psaList);j++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); + psa->matchEndpoint(psa->admin,sub,&score); + if(score>best_score){ /* We have a new winner! */ + best_score = score; + best_psa = psa; + } + } + + if(best_psa != NULL && best_score>0){ + best_psa->addSubscription(best_psa->admin,sub); + } + + // Inform discoveries for interest in the topic + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->interestedInTopic(disc->handle, sub->scope, sub->topic); ++ disc->interestedInTopic(disc->handle, properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(sub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); + + celixThreadMutex_unlock(&manager->psaListLock); + } + else{ + status=CELIX_INVALID_BUNDLE_CONTEXT; + } + + return status; +} + +celix_status_t pubsub_topologyManager_subscriberModified(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + + // Nop... + + return status; +} + +celix_status_t pubsub_topologyManager_subscriberRemoved(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = handle; + + pubsub_endpoint_pt subcmp = NULL; + if(pubsubEndpoint_createFromServiceReference(reference,&subcmp,false) == CELIX_SUCCESS){ + + int j,k; + + // Inform discoveries that we not interested in the topic any more + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); - disc->uninterestedInTopic(disc->handle, subcmp->scope, subcmp->topic); ++ disc->uninterestedInTopic(disc->handle, properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); + + celixThreadMutex_lock(&manager->subscriptionsLock); + celixThreadMutex_lock(&manager->psaListLock); + - char *sub_key = createScopeTopicKey(subcmp->scope,subcmp->topic); ++ char *sub_key = createScopeTopicKey(properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE),properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt sub_list_by_topic = hashMap_get(manager->subscriptions,sub_key); + free(sub_key); + if(sub_list_by_topic!=NULL){ + for(j=0;j<arrayList_size(sub_list_by_topic);j++){ + pubsub_endpoint_pt sub = arrayList_get(sub_list_by_topic,j); + if(pubsubEndpoint_equals(sub,subcmp)){ + for(k=0;k<arrayList_size(manager->psaList);k++){ + /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); + psa->removeSubscription(psa->admin,sub); + } + + } + arrayList_remove(sub_list_by_topic,j); + + /* If it was the last subscriber for this topic, tell PSA to close the ZMQ socket */ + if(arrayList_size(sub_list_by_topic)==0){ + for(k=0;k<arrayList_size(manager->psaList);k++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllSubscriptions(psa->admin,sub->scope, sub->topic); ++ psa->closeAllSubscriptions(psa->admin, (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(subcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + } + } + + pubsubEndpoint_destroy(sub); + + } + } + + celixThreadMutex_unlock(&manager->psaListLock); + celixThreadMutex_unlock(&manager->subscriptionsLock); + + pubsubEndpoint_destroy(subcmp); + + } + else{ + status=CELIX_INVALID_BUNDLE_CONTEXT; + } + + return status; + +} + +celix_status_t pubsub_topologyManager_pubsubDiscoveryAdded(void* handle, service_reference_pt reference, void* service) { + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = (pubsub_topology_manager_pt)handle; + publisher_endpoint_announce_pt disc = (publisher_endpoint_announce_pt)service; + + const char* fwUUID = NULL; + + bundleContext_getProperty(manager->context,OSGI_FRAMEWORK_FRAMEWORK_UUID,&fwUUID); + if(fwUUID==NULL){ + printf("PSD: ERRROR: Cannot retrieve fwUUID.\n"); + return CELIX_INVALID_BUNDLE_CONTEXT; + } + + celixThreadMutex_lock(&manager->publicationsLock); + + celixThreadMutex_lock(&manager->discoveryListLock); + hashMap_put(manager->discoveryList, reference, NULL); + celixThreadMutex_unlock(&manager->discoveryListLock); + + hash_map_iterator_pt iter = hashMapIterator_create(manager->publications); + while(hashMapIterator_hasNext(iter)){ + array_list_pt pubEP_list = (array_list_pt)hashMapIterator_nextValue(iter); + for(int i = 0; i < arrayList_size(pubEP_list); i++) { + pubsub_endpoint_pt pubEP = (pubsub_endpoint_pt)arrayList_get(pubEP_list,i); - if( (strcmp(pubEP->frameworkUUID,fwUUID)==0) && (pubEP->endpoint!=NULL)){ ++ if( (strcmp(properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),fwUUID)==0) && (properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)!=NULL)){ + status += disc->announcePublisher(disc->handle,pubEP); + } + } + } + hashMapIterator_destroy(iter); + + celixThreadMutex_unlock(&manager->publicationsLock); + + celixThreadMutex_lock(&manager->subscriptionsLock); + iter = hashMapIterator_create(manager->subscriptions); + + while(hashMapIterator_hasNext(iter)) { + array_list_pt l = (array_list_pt)hashMapIterator_nextValue(iter); + int i; + for(i=0;i<arrayList_size(l);i++){ + pubsub_endpoint_pt subEp = (pubsub_endpoint_pt)arrayList_get(l,i); + - disc->interestedInTopic(disc->handle, subEp->scope, subEp->topic); ++ disc->interestedInTopic(disc->handle, properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(subEp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + } + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->subscriptionsLock); + + return status; +} + +celix_status_t pubsub_topologyManager_pubsubDiscoveryModified(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + + status = pubsub_topologyManager_pubsubDiscoveryRemoved(handle, reference, service); + if (status == CELIX_SUCCESS) { + status = pubsub_topologyManager_pubsubDiscoveryAdded(handle, reference, service); + } + + return status; +} + +celix_status_t pubsub_topologyManager_pubsubDiscoveryRemoved(void * handle, service_reference_pt reference, void * service) { + celix_status_t status = CELIX_SUCCESS; + + pubsub_topology_manager_pt manager = handle; + + celixThreadMutex_lock(&manager->discoveryListLock); + + + if (hashMap_remove(manager->discoveryList, reference)) { + logHelper_log(manager->loghelper, OSGI_LOGSERVICE_INFO, "EndpointListener Removed"); + } + + celixThreadMutex_unlock(&manager->discoveryListLock); + + return status; +} + + +celix_status_t pubsub_topologyManager_publisherTrackerAdded(void *handle, array_list_pt listeners) { + + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = handle; + + int l_index; + + for (l_index = 0; l_index < arrayList_size(listeners); l_index++) { + + listener_hook_info_pt info = arrayList_get(listeners, l_index); + + pubsub_endpoint_pt pub = NULL; + if(pubsubEndpoint_createFromListenerHookInfo(info, &pub, true) == CELIX_SUCCESS){ + + celixThreadMutex_lock(&manager->publicationsLock); - char *pub_key = createScopeTopicKey(pub->scope, pub->topic); ++ char *pub_key = createScopeTopicKey(properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pub_list_by_topic = hashMap_get(manager->publications, pub_key); + if(pub_list_by_topic==NULL){ + arrayList_create(&pub_list_by_topic); + hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); + } + free(pub_key); + arrayList_add(pub_list_by_topic,pub); + + celixThreadMutex_unlock(&manager->publicationsLock); + + int j; + double score = 0; + double best_score = 0; + pubsub_admin_service_pt best_psa = NULL; + celixThreadMutex_lock(&manager->psaListLock); + + for(j=0;j<arrayList_size(manager->psaList);j++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); + psa->matchEndpoint(psa->admin,pub,&score); + if(score>best_score){ /* We have a new winner! */ + best_score = score; + best_psa = psa; + } + } + + if(best_psa != NULL && best_score>0){ + status = best_psa->addPublication(best_psa->admin,pub); + if(status==CELIX_SUCCESS){ + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + disc->announcePublisher(disc->handle,pub); + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); + } + } + + celixThreadMutex_unlock(&manager->psaListLock); + + } + + } + + return status; + +} + + +celix_status_t pubsub_topologyManager_publisherTrackerRemoved(void *handle, array_list_pt listeners) { + celix_status_t status = CELIX_SUCCESS; + pubsub_topology_manager_pt manager = handle; + + int l_index; + + for (l_index = 0; l_index < arrayList_size(listeners); l_index++) { + + listener_hook_info_pt info = arrayList_get(listeners, l_index); + + pubsub_endpoint_pt pubcmp = NULL; + if(pubsubEndpoint_createFromListenerHookInfo(info,&pubcmp,true) == CELIX_SUCCESS){ + + + int j,k; + celixThreadMutex_lock(&manager->psaListLock); + celixThreadMutex_lock(&manager->publicationsLock); + - char *pub_key = createScopeTopicKey(pubcmp->scope, pubcmp->topic); ++ char *pub_key = createScopeTopicKey(properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubcmp->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); + if(pub_list_by_topic!=NULL){ + for(j=0;j<arrayList_size(pub_list_by_topic);j++){ + pubsub_endpoint_pt pub = arrayList_get(pub_list_by_topic,j); + if(pubsubEndpoint_equals(pub,pubcmp)){ + for(k=0;k<arrayList_size(manager->psaList);k++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); + status = psa->removePublication(psa->admin,pub); + if(status==CELIX_SUCCESS){ /* We found the one that manages this endpoint */ + celixThreadMutex_lock(&manager->discoveryListLock); + hash_map_iterator_pt iter = hashMapIterator_create(manager->discoveryList); + while(hashMapIterator_hasNext(iter)){ + service_reference_pt disc_sr = (service_reference_pt)hashMapIterator_nextKey(iter); + publisher_endpoint_announce_pt disc = NULL; + bundleContext_getService(manager->context, disc_sr, (void**) &disc); + disc->removePublisher(disc->handle,pub); + bundleContext_ungetService(manager->context, disc_sr, NULL); + } + hashMapIterator_destroy(iter); + celixThreadMutex_unlock(&manager->discoveryListLock); + } + else if(status == CELIX_ILLEGAL_ARGUMENT){ /* Not a real error, just saying this psa does not handle this endpoint */ + status = CELIX_SUCCESS; + } + } + //} + arrayList_remove(pub_list_by_topic,j); + + /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket and then inform the discovery */ + if(arrayList_size(pub_list_by_topic)==0){ + for(k=0;k<arrayList_size(manager->psaList);k++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,k); - psa->closeAllPublications(psa->admin,pub->scope, pub->topic); ++ psa->closeAllPublications(psa->admin, (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(pub->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + } + } + + pubsubEndpoint_destroy(pub); + } + + } + } + + celixThreadMutex_unlock(&manager->publicationsLock); + celixThreadMutex_unlock(&manager->psaListLock); + + free(pub_key); + + pubsubEndpoint_destroy(pubcmp); + + } + + } + + return status; +} + +celix_status_t pubsub_topologyManager_announcePublisher(void *handle, pubsub_endpoint_pt pubEP){ + celix_status_t status = CELIX_SUCCESS; - printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint); ++ printf("PSTM: New publisher discovered for topic %s [fwUUID=%s, ep=%s]\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + + pubsub_topology_manager_pt manager = handle; + celixThreadMutex_lock(&manager->psaListLock); + celixThreadMutex_lock(&manager->publicationsLock); + - char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic); ++ char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + + array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); + if(pub_list_by_topic==NULL){ + arrayList_create(&pub_list_by_topic); + hashMap_put(manager->publications,strdup(pub_key),pub_list_by_topic); + } + free(pub_key); + + /* Shouldn't be any other duplicate, since it's filtered out by the discovery */ + pubsub_endpoint_pt p = NULL; + pubsubEndpoint_clone(pubEP, &p); + arrayList_add(pub_list_by_topic,p); + + int j; + double score = 0; + double best_score = 0; + pubsub_admin_service_pt best_psa = NULL; + + for(j=0;j<arrayList_size(manager->psaList);j++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,j); + psa->matchEndpoint(psa->admin,p,&score); + if(score>best_score){ /* We have a new winner! */ + best_score = score; + best_psa = psa; + } + } + + if(best_psa != NULL && best_score>0){ + best_psa->addPublication(best_psa->admin,p); + } + else{ + status = CELIX_ILLEGAL_STATE; + } + + celixThreadMutex_unlock(&manager->publicationsLock); + celixThreadMutex_unlock(&manager->psaListLock); + + return status; +} + +celix_status_t pubsub_topologyManager_removePublisher(void *handle, pubsub_endpoint_pt pubEP){ + celix_status_t status = CELIX_SUCCESS; - printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n",pubEP->topic,pubEP->frameworkUUID,pubEP->endpoint); ++ printf("PSTM: Publisher removed for topic %s [fwUUID=%s, ep=%s]\n", ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC), ++ properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID), ++ properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + + pubsub_topology_manager_pt manager = handle; + celixThreadMutex_lock(&manager->psaListLock); + celixThreadMutex_lock(&manager->publicationsLock); + int i; + - char *pub_key = createScopeTopicKey(pubEP->scope, pubEP->topic); ++ char *pub_key = createScopeTopicKey(properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_SCOPE), properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + array_list_pt pub_list_by_topic = hashMap_get(manager->publications,pub_key); + if(pub_list_by_topic==NULL){ - printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,pubEP->frameworkUUID,pubEP->endpoint); ++ printf("PSTM: ERROR: Cannot find topic for known endpoint [%s,%s,%s]. Something is inconsistent.\n",pub_key,properties_get(pubEP->endpoint_props, OSGI_FRAMEWORK_FRAMEWORK_UUID),properties_get(pubEP->endpoint_props, PUBSUB_ENDPOINT_URL)); + status = CELIX_ILLEGAL_STATE; + } + else{ + + pubsub_endpoint_pt p = NULL; + bool found = false; + + for(i=0;!found && i<arrayList_size(pub_list_by_topic);i++){ + p = (pubsub_endpoint_pt)arrayList_get(pub_list_by_topic,i); + found = pubsubEndpoint_equals(p,pubEP); + } + + if(found && p !=NULL){ + + for(i=0;i<arrayList_size(manager->psaList);i++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); + /* No problem with invoking removal on all psa's, only the one that manage this topic will do something */ + psa->removePublication(psa->admin,p); + } + + arrayList_removeElement(pub_list_by_topic,p); + + /* If it was the last publisher for this topic, tell PSA to close the ZMQ socket */ + if(arrayList_size(pub_list_by_topic)==0){ + + for(i=0;i<arrayList_size(manager->psaList);i++){ + pubsub_admin_service_pt psa = (pubsub_admin_service_pt)arrayList_get(manager->psaList,i); - psa->closeAllPublications(psa->admin,p->scope, p->topic); ++ psa->closeAllPublications(psa->admin, (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_SCOPE), (char*) properties_get(p->endpoint_props, PUBSUB_ENDPOINT_TOPIC)); + } + } + + pubsubEndpoint_destroy(p); + } + + + } + free(pub_key); + celixThreadMutex_unlock(&manager->publicationsLock); + celixThreadMutex_unlock(&manager->psaListLock); + + + return status; +} + http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/utils/include/properties.h ---------------------------------------------------------------------- diff --cc utils/include/properties.h index 5c6dc4d,0000000..582a242 mode 100644,000000..100644 --- a/utils/include/properties.h +++ b/utils/include/properties.h @@@ -1,68 -1,0 +1,70 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * properties.h + * + * \date Apr 27, 2010 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ + +#ifndef PROPERTIES_H_ +#define PROPERTIES_H_ + +#include <stdio.h> + +#include "hash_map.h" +#include "exports.h" +#include "celix_errno.h" +#ifdef __cplusplus +extern "C" { +#endif +typedef hash_map_pt properties_pt; +typedef hash_map_t properties_t; + +UTILS_EXPORT properties_pt properties_create(void); + +UTILS_EXPORT void properties_destroy(properties_pt properties); + +UTILS_EXPORT properties_pt properties_load(const char *filename); + +UTILS_EXPORT properties_pt properties_loadWithStream(FILE *stream); + +UTILS_EXPORT properties_pt properties_loadFromString(const char *input); + +UTILS_EXPORT void properties_store(properties_pt properties, const char *file, const char *header); + +UTILS_EXPORT const char *properties_get(properties_pt properties, const char *key); + +UTILS_EXPORT const char *properties_getWithDefault(properties_pt properties, const char *key, const char *defaultValue); + +UTILS_EXPORT void properties_set(properties_pt properties, const char *key, const char *value); + ++UTILS_EXPORT void properties_unset(properties_pt properties, const char *key); ++ +UTILS_EXPORT celix_status_t properties_copy(properties_pt properties, properties_pt *copy); + +#define PROPERTIES_FOR_EACH(props, key) \ + for(hash_map_iterator_t iter = hashMapIterator_construct(props); \ + hashMapIterator_hasNext(&iter), (key) = (const char*)hashMapIterator_nextKey(&iter);) +#ifdef __cplusplus +} +#endif + +#endif /* PROPERTIES_H_ */ http://git-wip-us.apache.org/repos/asf/celix/blob/0a5ef69a/utils/src/properties.c ---------------------------------------------------------------------- diff --cc utils/src/properties.c index 1e097a0,0000000..860b9bb mode 100644,000000..100644 --- a/utils/src/properties.c +++ b/utils/src/properties.c @@@ -1,330 -1,0 +1,335 @@@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ +/* + * properties.c + * + * \date Apr 27, 2010 + * \author <a href="mailto:d...@celix.apache.org">Apache Celix Project Team</a> + * \copyright Apache License, Version 2.0 + */ +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <ctype.h> +#include "celixbool.h" +#include "properties.h" +#include "utils.h" + +#define MALLOC_BLOCK_SIZE 5 + +static void parseLine(const char* line, properties_pt props); + +properties_pt properties_create(void) { + return hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals); +} + +void properties_destroy(properties_pt properties) { + hash_map_iterator_pt iter = hashMapIterator_create(properties); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + free(hashMapEntry_getKey(entry)); + free(hashMapEntry_getValue(entry)); + } + hashMapIterator_destroy(iter); + hashMap_destroy(properties, false, false); +} + +properties_pt properties_load(const char* filename) { + FILE *file = fopen(filename, "r"); + if(file==NULL){ + return NULL; + } + properties_pt props = properties_loadWithStream(file); + fclose(file); + return props; +} + +properties_pt properties_loadWithStream(FILE *file) { + properties_pt props = NULL; + + + if (file != NULL ) { + char *saveptr; + char *filebuffer = NULL; + char *line = NULL; + size_t file_size = 0; + + props = properties_create(); + fseek(file, 0, SEEK_END); + file_size = ftell(file); + fseek(file, 0, SEEK_SET); + + if(file_size > 0){ + filebuffer = calloc(file_size + 1, sizeof(char)); + if(filebuffer) { + size_t rs = fread(filebuffer, sizeof(char), file_size, file); + if(rs != file_size){ + fprintf(stderr,"fread read only %lu bytes out of %lu\n",rs,file_size); + } + filebuffer[file_size]='\0'; + line = strtok_r(filebuffer, "\n", &saveptr); + while ( line != NULL ) { + parseLine(line, props); + line = strtok_r(NULL, "\n", &saveptr); + } + free(filebuffer); + } + } + } + + return props; +} + +properties_pt properties_loadFromString(const char *input){ + properties_pt props = properties_create(); + + char *in = strdup(input); + char *line = NULL; + char *saveLinePointer = NULL; + + bool firstTime = true; + do { + if (firstTime){ + line = strtok_r(in, "\n", &saveLinePointer); + firstTime = false; + }else { + line = strtok_r(NULL, "\n", &saveLinePointer); + } + + if (line == NULL){ + break; + } + + parseLine(line, props); + } while(line != NULL); + + free(in); + + return props; +} + + +/** + * Header is ignored for now, cannot handle comments yet + */ +void properties_store(properties_pt properties, const char* filename, const char* header) { + FILE *file = fopen ( filename, "w+" ); + char *str; + + if (file != NULL) { + if (hashMap_size(properties) > 0) { + hash_map_iterator_pt iterator = hashMapIterator_create(properties); + while (hashMapIterator_hasNext(iterator)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iterator); + str = hashMapEntry_getKey(entry); + for (int i = 0; i < strlen(str); i += 1) { + if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') { + fputc('\\', file); + } + fputc(str[i], file); + } + + fputc('=', file); + + str = hashMapEntry_getValue(entry); + for (int i = 0; i < strlen(str); i += 1) { + if (str[i] == '#' || str[i] == '!' || str[i] == '=' || str[i] == ':') { + fputc('\\', file); + } + fputc(str[i], file); + } + + fputc('\n', file); + + } + hashMapIterator_destroy(iterator); + } + fclose(file); + } else { + perror("File is null"); + } +} + +celix_status_t properties_copy(properties_pt properties, properties_pt *out) { + celix_status_t status = CELIX_SUCCESS; + properties_pt copy = properties_create(); + + if (copy != NULL) { + hash_map_iterator_pt iter = hashMapIterator_create(properties); + while (hashMapIterator_hasNext(iter)) { + hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); + char *key = hashMapEntry_getKey(entry); + char *value = hashMapEntry_getValue(entry); + properties_set(copy, key, value); + } + hashMapIterator_destroy(iter); + } else { + status = CELIX_ENOMEM; + } + + if (status == CELIX_SUCCESS) { + *out = copy; + } + + return status; +} + +const char* properties_get(properties_pt properties, const char* key) { + return hashMap_get(properties, (void*)key); +} + +const char* properties_getWithDefault(properties_pt properties, const char* key, const char* defaultValue) { + const char* value = properties_get(properties, key); + return value == NULL ? defaultValue : value; +} + +void properties_set(properties_pt properties, const char* key, const char* value) { + hash_map_entry_pt entry = hashMap_getEntry(properties, key); + char* oldValue = NULL; + if (entry != NULL) { + char* oldKey = hashMapEntry_getKey(entry); + oldValue = hashMapEntry_getValue(entry); + hashMap_put(properties, oldKey, strndup(value, 1024*10)); + } else { + hashMap_put(properties, strndup(key, 1024*10), strndup(value, 1024*10)); + } + free(oldValue); +} + ++void properties_unset(properties_pt properties, const char* key) { ++ char* oldValue = hashMap_remove(properties, key); ++ free(oldValue); ++} ++ +static void updateBuffers(char **key, char ** value, char **output, int outputPos, int *key_len, int *value_len) { + if (*output == *key) { + if (outputPos == (*key_len) - 1) { + (*key_len) += MALLOC_BLOCK_SIZE; + *key = realloc(*key, *key_len); + *output = *key; + } + } + else { + if (outputPos == (*value_len) - 1) { + (*value_len) += MALLOC_BLOCK_SIZE; + *value = realloc(*value, *value_len); + *output = *value; + } + } +} + +static void parseLine(const char* line, properties_pt props) { + int linePos = 0; + bool precedingCharIsBackslash = false; + bool isComment = false; + int outputPos = 0; + char *output = NULL; + int key_len = MALLOC_BLOCK_SIZE; + int value_len = MALLOC_BLOCK_SIZE; + linePos = 0; + precedingCharIsBackslash = false; + isComment = false; + output = NULL; + outputPos = 0; + + //Ignore empty lines + if (line[0] == '\n' && line[1] == '\0') { + return; + } + + char *key = calloc(1, key_len); + char *value = calloc(1, value_len); + key[0] = '\0'; + value[0] = '\0'; + + while (line[linePos] != '\0') { + if (line[linePos] == ' ' || line[linePos] == '\t') { + if (output == NULL) { + //ignore + linePos += 1; + continue; + } + } + else { + if (output == NULL) { + output = key; + } + } + if (line[linePos] == '=' || line[linePos] == ':' || line[linePos] == '#' || line[linePos] == '!') { + if (precedingCharIsBackslash) { + //escaped special character + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + precedingCharIsBackslash = false; + } + else { + if (line[linePos] == '#' || line[linePos] == '!') { + if (outputPos == 0) { + isComment = true; + break; + } + else { + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + } + else { // = or : + if (output == value) { //already have a seperator + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + else { + output[outputPos++] = '\0'; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + output = value; + outputPos = 0; + } + } + } + } + else if (line[linePos] == '\\') { + if (precedingCharIsBackslash) { //double backslash -> backslash + output[outputPos++] = '\\'; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + precedingCharIsBackslash = true; + } + else { //normal character + precedingCharIsBackslash = false; + output[outputPos++] = line[linePos]; + updateBuffers(&key, &value, &output, outputPos, &key_len, &value_len); + } + linePos += 1; + } + if (output != NULL) { + output[outputPos] = '\0'; + } + + if (!isComment) { + //printf("putting 'key'/'value' '%s'/'%s' in properties\n", utils_stringTrim(key), utils_stringTrim(value)); + properties_set(props, utils_stringTrim(key), utils_stringTrim(value)); + } + if(key) { + free(key); + } + if(value) { + free(value); + } + +}