This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 02cddfd [cpp-client] add support of receiveAsync API (#3389) 02cddfd is described below commit 02cddfd8386860045378a16afa33820622f0759f Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Jan 21 14:51:17 2019 -0800 [cpp-client] add support of receiveAsync API (#3389) ### Motivation In many cases, client requires receiveAsync() api in Consumer. This api is already available into java-client but doesn't exist into CPP-client. ### Modification Add support for receiveAsync() api in cpp-client consumer. This PR is rebased and reopened from #577 --- pulsar-client-cpp/include/pulsar/Consumer.h | 13 + .../include/pulsar/ConsumerConfiguration.h | 1 + pulsar-client-cpp/lib/Consumer.cc | 9 + pulsar-client-cpp/lib/ConsumerImpl.cc | 87 +++++- pulsar-client-cpp/lib/ConsumerImpl.h | 5 + pulsar-client-cpp/lib/ConsumerImplBase.h | 1 + pulsar-client-cpp/lib/HandlerBase.h | 1 + pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 56 +++- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 5 + pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 58 +++- pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 5 + pulsar-client-cpp/tests/BasicEndToEndTest.cc | 316 ++++++++++++++++++++- 12 files changed, 532 insertions(+), 25 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index 4486515..4f1d6d4 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -100,6 +100,19 @@ class Consumer { Result receive(Message& msg, int timeoutMs); /** + * Receive a single message + * <p> + * Retrieves a message when it will be available and completes callback with received message. + * </p> + * <p> + * receiveAsync() should be called subsequently once callback gets completed with received message. + * Else it creates <i> backlog of receive requests </i> in the application. + * </p> + * @param ReceiveCallback will be completed when message is available + */ + void receiveAsync(ReceiveCallback callback); + + /** * Acknowledge the reception of a single message. * * This method will block until an acknowledgement is sent to the broker. After diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index 60ffef1..69c17e4 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -36,6 +36,7 @@ class PulsarWrapper; /// Callback definition for non-data operation typedef boost::function<void(Result result)> ResultCallback; +typedef boost::function<void(Result, const Message& msg)> ReceiveCallback; /// Callback definition for MessageListener typedef boost::function<void(Consumer consumer, const Message& msg)> MessageListener; diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index d89dd73..cbe44fe 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -73,6 +73,15 @@ Result Consumer::receive(Message& msg, int timeoutMs) { return impl_->receive(msg, timeoutMs); } +void Consumer::receiveAsync(ReceiveCallback callback) { + if (!impl_) { + Message msg; + callback(ResultConsumerNotInitialized, msg); + return; + } + impl_->receiveAsync(callback); +} + Result Consumer::acknowledge(const Message& message) { return acknowledge(message.getMessageId()); } Result Consumer::acknowledge(const MessageId& messageId) { diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 35a18db..4a24dfd 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -48,6 +48,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, startMessageId_(startMessageId), // This is the initial capacity of the queue incomingMessages_(std::max(config_.getReceiverQueueSize(), 1)), + pendingReceives_(), availablePermits_(conf.getReceiverQueueSize()), consumerId_(client->newConsumerId()), consumerName_(config_.getConsumerName()), @@ -290,6 +291,22 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: Lock lock(mutex_); numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m); } else { + Lock lock(pendingReceiveMutex_); + // if asyncReceive is waiting then notify callback without adding to incomingMessages queue + bool asyncReceivedWaiting = !pendingReceives_.empty(); + ReceiveCallback callback; + if (asyncReceivedWaiting) { + callback = pendingReceives_.front(); + pendingReceives_.pop(); + } + lock.unlock(); + + if (asyncReceivedWaiting) { + listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback, + shared_from_this(), ResultOk, m, callback)); + return; + } + // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize Message` if (config_.getReceiverQueueSize() != 0 || (config_.getReceiverQueueSize() == 0 && messageListener_)) { @@ -316,6 +333,27 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: } } +void ConsumerImpl::failPendingReceiveCallback() { + Message msg; + Lock lock(pendingReceiveMutex_); + while (!pendingReceives_.empty()) { + ReceiveCallback callback = pendingReceives_.front(); + pendingReceives_.pop(); + listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback, + shared_from_this(), ResultAlreadyClosed, msg, callback)); + } + lock.unlock(); +} + +void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg, + const ReceiveCallback& callback) { + if (result == ResultOk && config_.getReceiverQueueSize() != 0) { + messageProcessed(msg); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); + } + callback(result, msg); +} + // Zero Queue size is not supported with Batch Messages uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage) { @@ -345,8 +383,19 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection } } - // Regular path, append individual message to incoming messages queue - incomingMessages_.push(msg); + // + Lock lock(pendingReceiveMutex_); + if (!pendingReceives_.empty()) { + ReceiveCallback callback = pendingReceives_.front(); + pendingReceives_.pop(); + lock.unlock(); + listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback, + shared_from_this(), ResultOk, msg, callback)); + } else { + // Regular path, append individual message to incoming messages queue + incomingMessages_.push(msg); + lock.unlock(); + } } if (skippedMessages > 0) { @@ -509,6 +558,37 @@ Result ConsumerImpl::receive(Message& msg) { return res; } +void ConsumerImpl::receiveAsync(ReceiveCallback& callback) { + Message msg; + + // fail the callback if consumer is closing or closed + Lock stateLock(mutex_); + if (state_ != Ready) { + callback(ResultAlreadyClosed, msg); + return; + } + stateLock.unlock(); + + Lock lock(pendingReceiveMutex_); + if (incomingMessages_.pop(msg, milliseconds(0))) { + lock.unlock(); + messageProcessed(msg); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); + callback(ResultOk, msg); + } else { + pendingReceives_.push(callback); + lock.unlock(); + + if (config_.getReceiverQueueSize() == 0) { + ClientConnectionPtr currentCnx = getCnx().lock(); + if (currentCnx) { + LOG_DEBUG(getName() << "Send more permits: " << 1); + receiveMessages(currentCnx, 1); + } + } + } +} + Result ConsumerImpl::receiveHelper(Message& msg) { { Lock lock(mutex_); @@ -747,6 +827,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) { if (!callback.empty()) { future.addListener(boost::bind(&ConsumerImpl::handleClose, shared_from_this(), _1, callback)); } + + // fail pendingReceive callback + failPendingReceiveCallback(); } void ConsumerImpl::handleClose(Result result, ResultCallback callback) { diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index fcdaed1..62ac9da 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -42,6 +42,7 @@ #include <lib/BrokerConsumerStatsImpl.h> #include <lib/stats/ConsumerStatsImpl.h> #include <lib/stats/ConsumerStatsDisabled.h> +#include <queue> using namespace pulsar; @@ -92,6 +93,7 @@ class ConsumerImpl : public ConsumerImplBase, virtual const std::string& getTopic() const; virtual Result receive(Message& msg); virtual Result receive(Message& msg, int timeout); + virtual void receiveAsync(ReceiveCallback& callback); Result fetchSingleMessageFromBroker(Message& msg); virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback); virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback); @@ -140,6 +142,8 @@ class ConsumerImpl : public ConsumerImplBase, Result receiveHelper(Message& msg); Result receiveHelper(Message& msg, int timeout); void statsCallback(Result, ResultCallback, proto::CommandAck_AckType); + void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback); + void failPendingReceiveCallback(); Optional<MessageId> clearReceiveQueue(); @@ -156,6 +160,7 @@ class ConsumerImpl : public ConsumerImplBase, Optional<MessageId> lastDequedMessage_; UnboundedBlockingQueue<Message> incomingMessages_; + std::queue<ReceiveCallback> pendingReceives_; int availablePermits_; uint64_t consumerId_; std::string consumerName_; diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index 11f2fc6..0b1b6fb 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -35,6 +35,7 @@ class ConsumerImplBase { virtual const std::string& getTopic() const = 0; virtual Result receive(Message& msg) = 0; virtual Result receive(Message& msg, int timeout) = 0; + virtual void receiveAsync(ReceiveCallback& callback) = 0; virtual void unsubscribeAsync(ResultCallback callback) = 0; virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) = 0; virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) = 0; diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 4ad0800..f1a23fc 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -91,6 +91,7 @@ class HandlerBase { const std::string topic_; ClientConnectionWeakPtr connection_; boost::mutex mutex_; + boost::mutex pendingReceiveMutex_; ptime creationTimestamp_; const TimeDuration operationTimeut_; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 69c3cc0..329442a 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -34,6 +34,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std messages_(1000), listenerExecutor_(client->getListenerExecutorProvider()->get()), messageListener_(conf.getMessageListener()), + pendingReceives_(), namespaceName_(topicName ? topicName->getNamespaceName() : boost::shared_ptr<NamespaceName>()), lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(boost::make_shared<std::atomic<int>>(0)), @@ -385,6 +386,9 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { consumerPtr->closeAsync(boost::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose, shared_from_this(), _1, topicPartitionName, callback)); } + + // fail pending recieve + failPendingReceiveCallback(); } void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string& topicPartitionName, @@ -429,11 +433,23 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& << " message:" << msg.getDataAsString()); const std::string& topicPartitionName = consumer.getTopic(); msg.impl_->setTopicName(topicPartitionName); - messages_.push(msg); - if (messageListener_) { - listenerExecutor_->postWork( - boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer)); + Lock lock(pendingReceiveMutex_); + if (!pendingReceives_.empty()) { + ReceiveCallback callback = pendingReceives_.front(); + pendingReceives_.pop(); + lock.unlock(); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); + listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg)); + } else { + if (messages_.full()) { + lock.unlock(); + } + messages_.push(msg); + if (messageListener_) { + listenerExecutor_->postWork( + boost::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer)); + } } } @@ -489,6 +505,38 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { } } +void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { + Message msg; + + // fail the callback if consumer is closing or closed + Lock stateLock(mutex_); + if (state_ != Ready) { + callback(ResultAlreadyClosed, msg); + return; + } + stateLock.unlock(); + + Lock lock(pendingReceiveMutex_); + if (messages_.pop(msg, milliseconds(0))) { + lock.unlock(); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); + callback(ResultOk, msg); + } else { + pendingReceives_.push(callback); + } +} + +void MultiTopicsConsumerImpl::failPendingReceiveCallback() { + Message msg; + Lock lock(pendingReceiveMutex_); + while (!pendingReceives_.empty()) { + ReceiveCallback callback = pendingReceives_.front(); + pendingReceives_.pop(); + listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg)); + } + lock.unlock(); +} + void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { if (state_ != Ready) { callback(ResultAlreadyClosed); diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index 3b1d985..ddea13a 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -22,6 +22,7 @@ #include "ClientImpl.h" #include "BlockingQueue.h" #include <vector> +#include <queue> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> #include "boost/enable_shared_from_this.hpp" @@ -58,6 +59,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, virtual const std::string& getName() const; virtual Result receive(Message& msg); virtual Result receive(Message& msg, int timeout); + virtual void receiveAsync(ReceiveCallback& callback); virtual void unsubscribeAsync(ResultCallback callback); virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback); virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback); @@ -91,6 +93,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, ConsumerMap consumers_; std::map<std::string, int> topicsPartitions_; boost::mutex mutex_; + boost::mutex pendingReceiveMutex_; MultiTopicsConsumerState state_; boost::shared_ptr<std::atomic<int>> numberTopicPartitions_; LookupServicePtr lookupServicePtr_; @@ -100,6 +103,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_; UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_; const std::vector<std::string>& topics_; + std::queue<ReceiveCallback> pendingReceives_; /* methods */ void setState(MultiTopicsConsumerState state); @@ -112,6 +116,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, void messageReceived(Consumer consumer, const Message& msg); void internalListener(Consumer consumer); void receiveMessages(); + void failPendingReceiveCallback(); void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic, boost::shared_ptr<std::atomic<int>> topicsNeedCreate); diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index e3f93c7..c0edf11 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -37,6 +37,7 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std messages_(1000), listenerExecutor_(client->getListenerExecutorProvider()->get()), messageListener_(conf.getMessageListener()), + pendingReceives_(), topic_(topicName->toString()) { std::stringstream consumerStrStream; consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << "," @@ -95,6 +96,27 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) { } } +void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) { + Message msg; + + // fail the callback if consumer is closing or closed + Lock stateLock(mutex_); + if (state_ != Ready) { + callback(ResultAlreadyClosed, msg); + return; + } + stateLock.unlock(); + + Lock lock(pendingReceiveMutex_); + if (messages_.pop(msg, milliseconds(0))) { + lock.unlock(); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); + callback(ResultOk, msg); + } else { + pendingReceives_.push(callback); + } +} + void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) { LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing"); // change state to Closing, so that no Ready state operation is permitted during unsubscribe @@ -283,6 +305,9 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) { } } } + + // fail pending recieve + failPendingReceiveCallback(); } void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) { @@ -316,13 +341,38 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition()); const std::string& topicPartitionName = consumer.getTopic(); msg.impl_->setTopicName(topicPartitionName); - messages_.push(msg); - if (messageListener_) { - listenerExecutor_->postWork( - boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer)); + // messages_ is a blocking queue: if queue is already full then no need of lock as receiveAsync already + // gets available-msg and no need to put request in pendingReceives_ + Lock lock(pendingReceiveMutex_); + if (!pendingReceives_.empty()) { + ReceiveCallback callback = pendingReceives_.front(); + pendingReceives_.pop(); + lock.unlock(); + unAckedMessageTrackerPtr_->add(msg.getMessageId()); + listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg)); + } else { + if (messages_.full()) { + lock.unlock(); + } + messages_.push(msg); + if (messageListener_) { + listenerExecutor_->postWork( + boost::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer)); + } } } +void PartitionedConsumerImpl::failPendingReceiveCallback() { + Message msg; + Lock lock(pendingReceiveMutex_); + while (!pendingReceives_.empty()) { + ReceiveCallback callback = pendingReceives_.front(); + pendingReceives_.pop(); + listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg)); + } + lock.unlock(); +} + void PartitionedConsumerImpl::internalListener(Consumer consumer) { Message m; messages_.pop(m); diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 606c007..c20a7df 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -21,6 +21,7 @@ #include "ConsumerImpl.h" #include "ClientImpl.h" #include <vector> +#include <queue> #include <boost/shared_ptr.hpp> #include <boost/thread/mutex.hpp> #include "boost/enable_shared_from_this.hpp" @@ -52,6 +53,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, virtual const std::string& getTopic() const; virtual Result receive(Message& msg); virtual Result receive(Message& msg, int timeout); + virtual void receiveAsync(ReceiveCallback& callback); virtual void unsubscribeAsync(ResultCallback callback); virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback); virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback); @@ -80,6 +82,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, typedef std::vector<ConsumerImplPtr> ConsumerList; ConsumerList consumers_; boost::mutex mutex_; + boost::mutex pendingReceiveMutex_; PartitionedConsumerState state_; unsigned int unsubscribedSoFar_; BlockingQueue<Message> messages_; @@ -99,8 +102,10 @@ class PartitionedConsumerImpl : public ConsumerImplBase, void messageReceived(Consumer consumer, const Message& msg); void internalListener(Consumer consumer); void receiveMessages(); + void failPendingReceiveCallback(); Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_; UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_; + std::queue<ReceiveCallback> pendingReceives_; }; typedef boost::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr; typedef boost::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr; diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index a594a0a..9a34fbe 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -25,6 +25,7 @@ #include <lib/Latch.h> #include <sstream> #include "boost/date_time/posix_time/posix_time.hpp" +#include "boost/enable_shared_from_this.hpp" #include "CustomRoutingPolicy.h" #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> @@ -59,19 +60,41 @@ static void messageListenerFunctionWithoutAck(Consumer consumer, const Message& latch.countdown(); } -static void sendCallBack(Result r, const Message& msg, std::string prefix) { +static void sendCallBack(Result r, const Message& msg, std::string prefix, int* count) { + static boost::mutex sendMutex_; + sendMutex_.lock(); ASSERT_EQ(r, ResultOk); - std::string messageContent = prefix + boost::lexical_cast<std::string>(globalTestBatchMessagesCounter++); + std::string messageContent = prefix + boost::lexical_cast<std::string>(*count); ASSERT_EQ(messageContent, msg.getDataAsString()); LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString()); + *count += 1; + sendMutex_.unlock(); +} + +static void receiveCallBack(Result r, const Message& msg, std::string& messageContent, bool checkContent, + bool* isFailed, int* count) { + static boost::mutex receiveMutex_; + receiveMutex_.lock(); + + if (r == ResultOk) { + LOG_DEBUG("received msg " << msg.getDataAsString() << " expected: " << messageContent + << " count =" << *count); + if (checkContent) { + ASSERT_EQ(messageContent, msg.getDataAsString()); + } + *count += 1; + } else { + *isFailed = true; + } + receiveMutex_.unlock(); } static void sendCallBack(Result r, const Message& msg, std::string prefix, double percentage, - uint64_t delayInMicros) { + uint64_t delayInMicros, int* count) { if ((rand() % 100) <= percentage) { usleep(delayInMicros); } - sendCallBack(r, msg, prefix); + sendCallBack(r, msg, prefix, count); } class EncKeyReader : public CryptoKeyReader { @@ -158,13 +181,14 @@ TEST(BasicEndToEndTest, testBatchMessages) { // Send Asynchronously std::string prefix = "msg-batch-"; + int msgCount = 0; for (int i = 0; i < numOfMessages; i++) { std::string messageContent = prefix + boost::lexical_cast<std::string>(i); Message msg = MessageBuilder() .setContent(messageContent) .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); - producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount)); LOG_DEBUG("sending message " << messageContent); } @@ -172,15 +196,16 @@ TEST(BasicEndToEndTest, testBatchMessages) { int i = 0; while (consumer.receive(receivedMsg, 5000) == ResultOk) { std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i); - LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = " - << receivedMsg.getMessageId() << "]"); + LOG_INFO("Received Message with [ content - " + << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); + LOG_INFO("msg-index " << receivedMsg.getProperty("msgIndex") << ", expected " + << boost::lexical_cast<std::string>(i)); ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++)); ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); } // Number of messages produced - ASSERT_EQ(globalTestBatchMessagesCounter, numOfMessages); - globalTestBatchMessagesCounter = 0; + ASSERT_EQ(msgCount, numOfMessages); // Number of messages consumed ASSERT_EQ(i, numOfMessages); } @@ -983,13 +1008,14 @@ TEST(BasicEndToEndTest, testStatsLatencies) { // Send Asynchronously std::string prefix = "msg-stats-"; + int count = 0; for (int i = 0; i < numOfMessages; i++) { std::string messageContent = prefix + boost::lexical_cast<std::string>(i); Message msg = MessageBuilder() .setContent(messageContent) .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); - producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3)); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3, &count)); LOG_DEBUG("sending message " << messageContent); } @@ -2004,13 +2030,14 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { // Send Asynchronously of half the messages std::string prefix = "msg-batch-async"; + int msgCount = 0; for (int i = 0; i < numOfMessages / 2; i++) { std::string messageContent = prefix + boost::lexical_cast<std::string>(i); Message msg = MessageBuilder() .setContent(messageContent) .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); - producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount)); LOG_DEBUG("async sending message " << messageContent); } LOG_INFO("sending first half messages in async, should timeout to receive"); @@ -2026,7 +2053,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { .setContent(messageContent) .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); - producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount)); LOG_DEBUG("async sending message " << messageContent); } LOG_INFO("sending the other half messages in async, should able to receive"); @@ -2039,7 +2066,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) { while (consumer.receive(receivedMsg, 1000) == ResultOk) { std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i); LOG_INFO("Received Message with [ content - " - << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); + << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]" + << "property = " << receivedMsg.getProperty("msgIndex")); ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++)); ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); @@ -2234,13 +2262,14 @@ TEST(BasicEndToEndTest, testFlushInProducer) { // Send Asynchronously of half the messages std::string prefix = "msg-batch-async"; + int msgCount = 0; for (int i = 0; i < numOfMessages / 2; i++) { std::string messageContent = prefix + boost::lexical_cast<std::string>(i); Message msg = MessageBuilder() .setContent(messageContent) .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); - producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount)); LOG_DEBUG("async sending message " << messageContent); } LOG_INFO("sending half of messages in async, should timeout to receive"); @@ -2265,7 +2294,7 @@ TEST(BasicEndToEndTest, testFlushInProducer) { .setContent(messageContent) .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) .build(); - producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount)); LOG_DEBUG("async sending message " << messageContent); } LOG_INFO( @@ -2385,3 +2414,260 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { producer.close(); client.shutdown(); } + +TEST(BasicEndToEndTest, testReceiveAsync) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "persistent://public/default/receiveAsync"; + std::string subName = "my-sub-name"; + Producer producer; + + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + std::string temp = producer.getTopic(); + ASSERT_EQ(temp, topicName); + temp = consumer.getTopic(); + ASSERT_EQ(temp, topicName); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + + std::string content = "msg-1-content"; + int count = 0; + int totalMsgs = 5; + bool isFailed = false; + for (int i = 0; i < totalMsgs; i++) { + consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, true, &isFailed, &count)); + } + // Send synchronously + for (int i = 0; i < totalMsgs; i++) { + Message msg = MessageBuilder().setContent(content).build(); + result = producer.send(msg); + ASSERT_EQ(ResultOk, result); + } + + // check strategically + for (int i = 0; i < 3; i++) { + if (count == totalMsgs) { + break; + } + usleep(1 * 1000 * 1000); + } + ASSERT_FALSE(isFailed); + ASSERT_EQ(count, totalMsgs); + client.shutdown(); +} + +TEST(BasicEndToEndTest, testPartitionedReceiveAsync) { + Client client(lookupUrl); + std::string topicName = "persistent://public/default/receiveAsync-partition"; + + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/receiveAsync-partition/partitions"; + int res = makePutRequest(url, "3"); + + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer; + Result result = client.createProducer(topicName, producer); + ASSERT_EQ(ResultOk, result); + + Consumer consumer; + result = client.subscribe(topicName, "subscription-A", consumer); + ASSERT_EQ(ResultOk, result); + + int totalMsgs = 10; + std::string content; + int count = 0; + bool isFailed = false; + for (int i = 0; i < totalMsgs; i++) { + consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count)); + } + + for (int i = 0; i < totalMsgs; i++) { + boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); + long nanoSeconds = t.time_of_day().total_nanoseconds(); + std::stringstream ss; + ss << nanoSeconds; + Message msg = MessageBuilder().setContent(ss.str()).setPartitionKey(ss.str()).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + LOG_DEBUG("Message Timestamp is " << msg.getPublishTimestamp()); + LOG_DEBUG("Message is " << msg); + } + + // check strategically + for (int i = 0; i < 3; i++) { + if (count == totalMsgs) { + break; + } + usleep(1 * 1000 * 1000); + } + ASSERT_FALSE(isFailed); + ASSERT_EQ(count, totalMsgs); + client.shutdown(); +} + +TEST(BasicEndToEndTest, testBatchMessagesReceiveAsync) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "persistent://public/default/receiveAsync-batch"; + std::string subName = "subscription-name"; + Producer producer; + + // Enable batching on producer side + int batchSize = 2; + int numOfMessages = 100; + + ProducerConfiguration conf; + conf.setCompressionType(CompressionLZ4); + conf.setBatchingMaxMessages(batchSize); + conf.setBatchingEnabled(true); + conf.setBlockIfQueueFull(true); + + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + client.subscribe(topicName, subName, consumer); + + std::string temp = producer.getTopic(); + ASSERT_EQ(temp, topicName); + temp = consumer.getTopic(); + ASSERT_EQ(temp, topicName); + ASSERT_EQ(consumer.getSubscriptionName(), subName); + + std::string content; + int count = 0; + bool isFailed = false; + for (int i = 0; i < numOfMessages; i++) { + consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count)); + } + + // Send Asynchronously + std::string prefix = "msg-batch-"; + int msgCount = 0; + for (int i = 0; i < numOfMessages; i++) { + std::string messageContent = prefix + boost::lexical_cast<std::string>(i); + Message msg = MessageBuilder() + .setContent(messageContent) + .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) + .build(); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount)); + LOG_DEBUG("sending message " << messageContent); + } + + // check strategically + for (int i = 0; i < 3; i++) { + if (count == numOfMessages) { + break; + } + usleep(1 * 1000 * 1000); + } + ASSERT_FALSE(isFailed); + ASSERT_EQ(count, numOfMessages); +} + +TEST(BasicEndToEndTest, testReceiveAsyncFailedConsumer) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "persistent://public/default/receiveAsync-failed"; + std::string subName = "my-sub-name"; + + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + bool isFailedOnConsumerClosing = false; + std::string content; + int closingCunt = 0; + // callback should immediately fail + consumer.receiveAsync( + boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt)); + + // close consumer + consumer.close(); + bool isFailedOnConsumerClosed = false; + int count = 0; + // callback should immediately fail + consumer.receiveAsync( + boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count)); + + // check strategically + for (int i = 0; i < 3; i++) { + if (isFailedOnConsumerClosing && isFailedOnConsumerClosed) { + break; + } + usleep(1 * 1000 * 1000); + } + + ASSERT_TRUE(isFailedOnConsumerClosing); + ASSERT_TRUE(isFailedOnConsumerClosed); + ASSERT_EQ(count, 0); + + client.shutdown(); +} + +TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) { + Client client(lookupUrl); + std::string topicName = "persistent://public/default/receiveAsync-fail-partition"; + + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/receiveAsync-fail-partition/partitions"; + int res = makePutRequest(url, "3"); + + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + + Consumer consumer; + Result result = client.subscribe(topicName, "subscription-A", consumer); + ASSERT_EQ(ResultOk, result); + + bool isFailedOnConsumerClosing = false; + std::string content; + int closingCunt = 0; + // callback should immediately fail + consumer.receiveAsync( + boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt)); + // close consumer + consumer.close(); + + int count = 0; + bool isFailedOnConsumerClosed = false; + consumer.receiveAsync( + boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count)); + + // check strategically + for (int i = 0; i < 3; i++) { + if (isFailedOnConsumerClosing && isFailedOnConsumerClosed) { + break; + } + usleep(1 * 1000 * 1000); + } + + ASSERT_TRUE(isFailedOnConsumerClosing); + ASSERT_TRUE(isFailedOnConsumerClosed); + ASSERT_EQ(count, 0); + client.shutdown(); +}