This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 40a5aa5 In C API add context pointer to callbacks (#1761) 40a5aa5 is described below commit 40a5aa5366c6e8c1ed172bfc5fcf7e1749717703 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu May 10 16:01:12 2018 -0700 In C API add context pointer to callbacks (#1761) --- .../examples/SampleConsumerListenerCApi.c | 4 +-- pulsar-client-cpp/include/pulsar/c/client.h | 20 +++++------ pulsar-client-cpp/include/pulsar/c/consumer.h | 17 ++++----- .../include/pulsar/c/consumer_configuration.h | 9 ++--- pulsar-client-cpp/include/pulsar/c/message_id.h | 6 +++- .../include/pulsar/c/message_router.h | 3 +- pulsar-client-cpp/include/pulsar/c/producer.h | 8 ++--- .../include/pulsar/c/producer_configuration.h | 2 +- pulsar-client-cpp/include/pulsar/c/reader.h | 4 +-- .../include/pulsar/c/reader_configuration.h | 4 +-- pulsar-client-cpp/lib/c/c_Client.cc | 42 +++++++++++----------- pulsar-client-cpp/lib/c/c_Consumer.cc | 28 ++++++++------- pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 12 ++++--- pulsar-client-cpp/lib/c/c_MessageId.cc | 13 ++++++- pulsar-client-cpp/lib/c/c_Producer.cc | 14 ++++---- pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 9 ++--- pulsar-client-cpp/lib/c/c_Reader.cc | 4 +-- pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc | 8 ++--- pulsar-client-cpp/lib/c/c_structs.h | 8 +++-- 19 files changed, 120 insertions(+), 95 deletions(-) diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c index 8f3ed0d..e75c5d5 100644 --- a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c +++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c @@ -20,7 +20,7 @@ #include <stdio.h> #include <pulsar/c/client.h> -static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message) { +static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message, void* ctx) { printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message), pulsar_message_get_data(message)); @@ -34,7 +34,7 @@ int main() { pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared); - pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback); + pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback, NULL); pulsar_consumer_t *consumer; pulsar_result res = pulsar_client_subscribe(client, "my-topic", "my-subscrition", consumer_conf, &consumer); diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h index 11320e7..b8ef1ed 100644 --- a/pulsar-client-cpp/include/pulsar/c/client.h +++ b/pulsar-client-cpp/include/pulsar/c/client.h @@ -40,12 +40,12 @@ typedef struct _pulsar_producer pulsar_producer_t; typedef struct _pulsar_client_configuration pulsar_client_configuration_t; typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t; -typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer); +typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer, void *ctx); -typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer); -typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader); +typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx); +typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx); -typedef void (*pulsar_close_callback)(pulsar_result result); +typedef void (*pulsar_close_callback)(pulsar_result result, void *ctx); /** * Create a Pulsar client object connecting to the specified cluster address and using the specified @@ -73,7 +73,7 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic, const pulsar_producer_configuration_t *conf, - pulsar_create_producer_callback callback); + pulsar_create_producer_callback callback, void *ctx); pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic, const char *subscriptionName, @@ -81,8 +81,8 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic pulsar_consumer_t **consumer); void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName, - const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer, - pulsar_subscribe_callback callback); + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx); /** * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified @@ -119,12 +119,12 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic, const pulsar_message_id_t *startMessageId, - pulsar_reader_configuration_t *conf, pulsar_reader_t **reader, - pulsar_reader_callback callback); + pulsar_reader_configuration_t *conf, pulsar_reader_callback callback, + void *ctx); pulsar_result pulsar_client_close(pulsar_client_t *client); -void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback); +void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx); void pulsar_client_free(pulsar_client_t *client); diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h index 59c99e3..2917eea 100644 --- a/pulsar-client-cpp/include/pulsar/c/consumer.h +++ b/pulsar-client-cpp/include/pulsar/c/consumer.h @@ -29,7 +29,7 @@ extern "C" { typedef struct _pulsar_consumer pulsar_consumer_t; -typedef void (*pulsar_result_callback)(pulsar_result); +typedef void (*pulsar_result_callback)(pulsar_result, void *); /** * @return the topic this consumer is subscribed to @@ -67,7 +67,8 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer); * * @param callback the callback to get notified when the operation is complete */ -void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback); +void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, + void *ctx); /** * Receive a single message. @@ -117,10 +118,10 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar * @param callback callback that will be triggered when the message has been acknowledged */ void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message, - pulsar_result_callback callback); + pulsar_result_callback callback, void *ctx); void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId, - pulsar_result_callback callback); + pulsar_result_callback callback, void *ctx); /** * Acknowledge the reception of all the messages in the stream up to (and including) @@ -155,15 +156,15 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu * @param callback callback that will be triggered when the message has been acknowledged */ void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message, - pulsar_result_callback callback); + pulsar_result_callback callback, void *ctx); void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId, - pulsar_result_callback callback); + pulsar_result_callback callback, void *ctx); pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer); -void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback); +void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx); void pulsar_consumer_free(pulsar_consumer_t *consumer); @@ -187,7 +188,7 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer); * connection * breaks, the messages are redelivered after reconnect. */ -void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer); +void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer); #ifdef __cplusplus } diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h index 3bd9571..445e34e 100644 --- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h @@ -43,7 +43,7 @@ typedef enum { } pulsar_consumer_type; /// Callback definition for MessageListener -typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg); +typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx); pulsar_consumer_configuration_t *pulsar_consumer_configuration_create(); @@ -73,10 +73,11 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type( * for every message received. */ void pulsar_consumer_configuration_set_message_listener( - pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener); + pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener, + void *ctx); -int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration, - pulsar_consumer_t *consumer); +int pulsar_consumer_configuration_has_message_listener( + pulsar_consumer_configuration_t *consumer_configuration); /** * Sets the size of the consumer receive queue. diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h b/pulsar-client-cpp/include/pulsar/c/message_id.h index a0eb684..44d0c8f 100644 --- a/pulsar-client-cpp/include/pulsar/c/message_id.h +++ b/pulsar-client-cpp/include/pulsar/c/message_id.h @@ -41,13 +41,17 @@ const pulsar_message_id_t *pulsar_message_id_latest(); /** * Serialize the message id into a binary string for storing */ -const void *pulsar_message_id_serialize(int *len); +void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len); /** * Deserialize a message id from a binary string */ pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t len); +char *pulsar_message_id_str(pulsar_message_id_t *messageId); + +void pulsar_message_id_free(pulsar_message_id_t *messageId); + #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h b/pulsar-client-cpp/include/pulsar/c/message_router.h index aea4188..07ff7a3 100644 --- a/pulsar-client-cpp/include/pulsar/c/message_router.h +++ b/pulsar-client-cpp/include/pulsar/c/message_router.h @@ -27,7 +27,8 @@ extern "C" { typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t; -typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata); +typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata, + void *ctx); int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t *topicMetadata); diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h b/pulsar-client-cpp/include/pulsar/c/producer.h index 6121e06..6b506b8 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer.h +++ b/pulsar-client-cpp/include/pulsar/c/producer.h @@ -30,8 +30,8 @@ extern "C" { typedef struct _pulsar_producer pulsar_producer_t; -typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg); -typedef void (*pulsar_close_callback)(pulsar_result); +typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg, void *ctx); +typedef void (*pulsar_close_callback)(pulsar_result, void *ctx); /** * @return the topic to which producer is publishing to @@ -76,7 +76,7 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t * @param callback the callback to get notification of the completion */ void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg, - pulsar_send_callback callback); + pulsar_send_callback callback, void *ctx); /** * Get the last sequence id that was published by this producer. @@ -110,7 +110,7 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer); * triggered when all pending write requests are persisted. In case of errors, * pending writes will not be retried. */ -void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback); +void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx); void pulsar_producer_free(pulsar_producer_t *producer); diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index 534c411..636fe68 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -97,7 +97,7 @@ pulsar_partitions_routing_mode pulsar_producer_configuration_get_partitions_rout pulsar_producer_configuration_t *conf); void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf, - pulsar_message_router router); + pulsar_message_router router, void *ctx); void pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t *conf, pulsar_hashing_scheme scheme); diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h b/pulsar-client-cpp/include/pulsar/c/reader.h index 648c172..547bbf2 100644 --- a/pulsar-client-cpp/include/pulsar/c/reader.h +++ b/pulsar-client-cpp/include/pulsar/c/reader.h @@ -27,7 +27,7 @@ extern "C" { typedef struct _pulsar_reader pulsar_reader_t; -typedef void (*pulsar_result_callback)(pulsar_result); +typedef void (*pulsar_result_callback)(pulsar_result, void *); /** * @return the topic this reader is reading from @@ -60,7 +60,7 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls pulsar_result pulsar_reader_close(pulsar_reader_t *reader); -void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback); +void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx); void pulsar_reader_free(pulsar_reader_t *reader); diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h index 914bbd9..c7aaf14 100644 --- a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h @@ -25,7 +25,7 @@ extern "C" { typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t; -typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg); +typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg, void *ctx); pulsar_reader_configuration_t *pulsar_reader_configuration_create(); @@ -36,7 +36,7 @@ void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configurati * messages. A listener will be called in order for every message received. */ void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration, - pulsar_reader_listener listener); + pulsar_reader_listener listener, void *ctx); int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration); diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc index cec7a13..1063bb8 100644 --- a/pulsar-client-cpp/lib/c/c_Client.cc +++ b/pulsar-client-cpp/lib/c/c_Client.cc @@ -47,21 +47,21 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char } static void handle_create_producer_callback(pulsar::Result result, pulsar::Producer producer, - pulsar_create_producer_callback callback) { + pulsar_create_producer_callback callback, void *ctx) { if (result == pulsar::ResultOk) { pulsar_producer_t *c_producer = new pulsar_producer_t; c_producer->producer = producer; - callback(pulsar_result_Ok, c_producer); + callback(pulsar_result_Ok, c_producer, ctx); } else { - callback((pulsar_result)result, NULL); + callback((pulsar_result)result, NULL, ctx); } } void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic, const pulsar_producer_configuration_t *conf, - pulsar_create_producer_callback callback) { + pulsar_create_producer_callback callback, void *ctx) { client->client->createProducerAsync(topic, conf->conf, - boost::bind(&handle_create_producer_callback, _1, _2, callback)); + boost::bind(&handle_create_producer_callback, _1, _2, callback, ctx)); } pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic, @@ -81,21 +81,21 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic } static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer consumer, - pulsar_subscribe_callback callback) { + pulsar_subscribe_callback callback, void *ctx) { if (result == pulsar::ResultOk) { pulsar_consumer_t *c_consumer = new pulsar_consumer_t; c_consumer->consumer = consumer; - callback(pulsar_result_Ok, c_consumer); + callback(pulsar_result_Ok, c_consumer, ctx); } else { - callback((pulsar_result)result, NULL); + callback((pulsar_result)result, NULL, ctx); } } void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName, - const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer, - pulsar_subscribe_callback callback) { + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx) { client->client->subscribeAsync(topic, subscriptionName, conf->consumerConfiguration, - boost::bind(&handle_subscribe_callback, _1, _2, callback)); + boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); } pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic, @@ -113,30 +113,30 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t } static void handle_reader_callback(pulsar::Result result, pulsar::Reader reader, - pulsar_reader_callback callback) { + pulsar_reader_callback callback, void *ctx) { if (result == pulsar::ResultOk) { pulsar_reader_t *c_reader = new pulsar_reader_t; c_reader->reader = reader; - callback(pulsar_result_Ok, c_reader); + callback(pulsar_result_Ok, c_reader, ctx); } else { - callback((pulsar_result)result, NULL); + callback((pulsar_result)result, NULL, ctx); } } void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic, const pulsar_message_id_t *startMessageId, - pulsar_reader_configuration_t *conf, pulsar_reader_t **reader, - pulsar_reader_callback callback) { + pulsar_reader_configuration_t *conf, pulsar_reader_callback callback, + void *ctx) { client->client->createReaderAsync(topic, startMessageId->messageId, conf->conf, - boost::bind(&handle_reader_callback, _1, _2, callback)); + boost::bind(&handle_reader_callback, _1, _2, callback, ctx)); } pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close(); } -static void handle_client_close(pulsar::Result result, pulsar_close_callback callback) { - callback((pulsar_result)result); +static void handle_client_close(pulsar::Result result, pulsar_close_callback callback, void *ctx) { + callback((pulsar_result)result, ctx); } -void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback) { - client->client->closeAsync(boost::bind(handle_client_close, _1, callback)); +void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx) { + client->client->closeAsync(boost::bind(handle_client_close, _1, callback, ctx)); } diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc index 22021a5..dae824a 100644 --- a/pulsar-client-cpp/lib/c/c_Consumer.cc +++ b/pulsar-client-cpp/lib/c/c_Consumer.cc @@ -33,8 +33,9 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) { return (pulsar_result)consumer->consumer.unsubscribe(); } -void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) { - consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback)); +void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, + void *ctx) { + consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback, ctx)); } pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg) { @@ -67,14 +68,15 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar } void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message, - pulsar_result_callback callback) { - consumer->consumer.acknowledgeAsync(message->message, boost::bind(handle_result_callback, _1, callback)); + pulsar_result_callback callback, void *ctx) { + consumer->consumer.acknowledgeAsync(message->message, + boost::bind(handle_result_callback, _1, callback, ctx)); } void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId, - pulsar_result_callback callback) { + pulsar_result_callback callback, void *ctx) { consumer->consumer.acknowledgeAsync(messageId->messageId, - boost::bind(handle_result_callback, _1, callback)); + boost::bind(handle_result_callback, _1, callback, ctx)); } pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message) { @@ -87,24 +89,24 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu } void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message, - pulsar_result_callback callback) { + pulsar_result_callback callback, void *ctx) { consumer->consumer.acknowledgeCumulativeAsync(message->message, - boost::bind(handle_result_callback, _1, callback)); + boost::bind(handle_result_callback, _1, callback, ctx)); } void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId, - pulsar_result_callback callback) { + pulsar_result_callback callback, void *ctx) { consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId, - boost::bind(handle_result_callback, _1, callback)); + boost::bind(handle_result_callback, _1, callback, ctx)); } pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) { return (pulsar_result)consumer->consumer.close(); } -void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) { - consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback)); +void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx) { + consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx)); } void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; } @@ -117,6 +119,6 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer) { return (pulsar_result)consumer->consumer.resumeMessageListener(); } -void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) { +void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer) { return consumer->consumer.redeliverUnacknowledgedMessages(); } diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc index dcd0aac..42c7c73 100644 --- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc @@ -41,21 +41,23 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type( } static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg, - pulsar_message_listener listener) { + pulsar_message_listener listener, void *ctx) { pulsar_consumer_t c_consumer; c_consumer.consumer = consumer; pulsar_message_t *message = new pulsar_message_t; message->message = msg; - listener(&c_consumer, message); + listener(&c_consumer, message, ctx); } void pulsar_consumer_configuration_set_message_listener( - pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener) { + pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener, + void *ctx) { consumer_configuration->consumerConfiguration.setMessageListener( - boost::bind(message_listener_callback, _1, _2, messageListener)); + boost::bind(message_listener_callback, _1, _2, messageListener, ctx)); } -int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration) { +int pulsar_consumer_configuration_has_message_listener( + pulsar_consumer_configuration_t *consumer_configuration) { return consumer_configuration->consumerConfiguration.hasMessageListener(); } diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc b/pulsar-client-cpp/lib/c/c_MessageId.cc index 5728359..8d1e96c 100644 --- a/pulsar-client-cpp/lib/c/c_MessageId.cc +++ b/pulsar-client-cpp/lib/c/c_MessageId.cc @@ -21,6 +21,7 @@ #include "c_structs.h" #include <boost/thread/once.hpp> +#include <sstream> boost::once_flag initialized = BOOST_ONCE_INIT; @@ -42,7 +43,7 @@ const pulsar_message_id_t *pulsar_message_id_latest() { return &latest; } -const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) { +void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) { std::string str; messageId->messageId.serialize(str); void *p = malloc(str.length()); @@ -56,3 +57,13 @@ pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t messageId->messageId = pulsar::MessageId::deserialize(strId); return messageId; } + +char *pulsar_message_id_str(pulsar_message_id_t *messageId) { + std::stringstream ss; + ss << messageId->messageId; + std::string s = ss.str(); + + return strndup(s.c_str(), s.length()); +} + +void pulsar_message_id_free(pulsar_message_id_t *messageId) { delete messageId; } diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc b/pulsar-client-cpp/lib/c/c_Producer.cc index 1de670c..784167e 100644 --- a/pulsar-client-cpp/lib/c/c_Producer.cc +++ b/pulsar-client-cpp/lib/c/c_Producer.cc @@ -38,15 +38,15 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t return (pulsar_result)producer->producer.send(msg->message); } -static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, - pulsar_send_callback callback) { - callback((pulsar_result)result, msg); +static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, pulsar_send_callback callback, + void *ctx) { + callback((pulsar_result)result, msg, ctx); } void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg, - pulsar_send_callback callback) { + pulsar_send_callback callback, void *ctx) { msg->message = msg->builder.build(); - producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback)); + producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback, ctx)); } int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) { @@ -57,6 +57,6 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer) { return (pulsar_result)producer->producer.close(); } -void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback) { - producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback)); +void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) { + producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx)); } diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc index 8cc9345..914fc0a 100644 --- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc @@ -107,9 +107,10 @@ pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme( class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy { pulsar_message_router _router; + void *_ctx; public: - MessageRoutingPolicy(pulsar_message_router router) : _router(router) {} + MessageRoutingPolicy(pulsar_message_router router, void *ctx) : _router(router), _ctx(ctx) {} int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata &topicMetadata) { pulsar_message_t message; @@ -118,13 +119,13 @@ class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy { pulsar_topic_metadata_t metadata; metadata.metadata = &topicMetadata; - return _router(&message, &metadata); + return _router(&message, &metadata, _ctx); } }; void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf, - pulsar_message_router router) { - conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router)); + pulsar_message_router router, void *ctx) { + conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router, ctx)); } void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf, diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc b/pulsar-client-cpp/lib/c/c_Reader.cc index bb5d69b..3f7849d 100644 --- a/pulsar-client-cpp/lib/c/c_Reader.cc +++ b/pulsar-client-cpp/lib/c/c_Reader.cc @@ -47,8 +47,8 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); } -void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback) { - reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback)); +void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) { + reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx)); } void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; } diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc index b6419a6..55a6dc5 100644 --- a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc @@ -32,17 +32,17 @@ pulsar_reader_configuration_t *pulsar_reader_configuration_create() { void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configuration) { delete configuration; } static void message_listener_callback(pulsar::Reader reader, const pulsar::Message &msg, - pulsar_reader_listener listener) { + pulsar_reader_listener listener, void *ctx) { pulsar_reader_t c_reader; c_reader.reader = reader; pulsar_message_t *message = new pulsar_message_t; message->message = msg; - listener(&c_reader, message); + listener(&c_reader, message, ctx); } void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration, - pulsar_reader_listener listener) { - configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener)); + pulsar_reader_listener listener, void *ctx) { + configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener, ctx)); } int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration) { diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h index b207ab4..a4ff193 100644 --- a/pulsar-client-cpp/lib/c/c_structs.h +++ b/pulsar-client-cpp/lib/c/c_structs.h @@ -73,8 +73,10 @@ struct _pulsar_topic_metadata { const pulsar::TopicMetadata* metadata; }; -typedef void (*pulsar_result_callback)(pulsar_result); +typedef void (*pulsar_result_callback)(pulsar_result res, void* ctx); -static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback) { - callback((pulsar_result)result); +static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback, void* ctx) { + if (callback) { + callback((pulsar_result)result, ctx); + } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact mme...@apache.org.