This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new fd29759  feat(consumer): remove event if consumer service shutdown 
(#233)
fd29759 is described below

commit fd29759a0280c4e2ffc63c1ac4578700dd68018e
Author: dinglei <libya_...@163.com>
AuthorDate: Thu Jan 16 15:19:36 2020 +0800

    feat(consumer): remove event if consumer service shutdown (#233)
---
 src/consumer/ConsumeMessageConcurrentlyService.cpp | 10 ++++++++--
 src/consumer/DefaultMQPushConsumer.cpp             | 16 +++++++++++-----
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp 
b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 9c3a05b..deda8ac 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -73,8 +73,11 @@ void 
ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
              request->m_messageQueue.toString().c_str());
     return;
   }
-  if (!request->isDropped()) {
+  if (!request->isDropped() && !m_ioService.stopped()) {
     
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
 this, request, msgs));
+  } else {
+    LOG_INFO("IOService stopped or Pull request for %s is dropped, will not 
post ConsumeRequest.",
+             request->m_messageQueue.toString().c_str());
   }
 }
 void 
ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest>
 pullRequest,
@@ -93,13 +96,16 @@ void 
ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_pt
              (request->m_messageQueue).toString().c_str());
     return;
   }
-  if (!request->isDropped()) {
+  if (!request->isDropped() && !m_ioService.stopped()) {
     boost::asio::deadline_timer* t =
         new boost::asio::deadline_timer(m_ioService, 
boost::posix_time::milliseconds(millis));
     t->async_wait(
         
boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest), 
this, t, request, msgs));
     LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", 
(request->m_messageQueue).toString().c_str(),
              millis);
+  } else {
+    LOG_INFO("IOService stopped or Pull request for %s is dropped, will not 
post delay ConsumeRequest.",
+             request->m_messageQueue.toString().c_str());
   }
 }
 
diff --git a/src/consumer/DefaultMQPushConsumer.cpp 
b/src/consumer/DefaultMQPushConsumer.cpp
index 8e2541e..9da0ca8 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -575,11 +575,17 @@ bool 
DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest>
     LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
request->m_messageQueue.toString().c_str());
     return false;
   }
-  boost::asio::deadline_timer* t =
-      new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(millis));
-  
t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
 this, t, request));
-  LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", 
(request->m_messageQueue).toString().c_str(), millis);
-  return true;
+  if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
+    boost::asio::deadline_timer* t =
+        new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(millis));
+    
t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
 this, t, request));
+    LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", 
(request->m_messageQueue).toString().c_str(), millis);
+    return true;
+  } else {
+    LOG_WARN("Service or TaskQueue shutdown, produce PullRequest of mq:%s 
failed",
+             request->m_messageQueue.toString().c_str());
+    return false;
+  }
 }
 
 bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> 
pullRequest) {

Reply via email to