This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 448a504  [Improve] Add error handling for the message listener (#319)
448a504 is described below

commit 448a5046c9fd70b749e8dac31f395c90498b027e
Author: Zike Yang <z...@apache.org>
AuthorDate: Thu Apr 27 14:33:28 2023 +0800

    [Improve] Add error handling for the message listener (#319)
    
    ## Motivation
    Currently, there is no error handling for the message listener. If there 
are any errors thrown from the user's listener, the program will crash.
    
    ## Modification
    * Add error handling for the message listener. The client won't crash the 
program if there are any errors in the user function. Instead, it will log as 
the error.
    * Add LogUtils to the internal native code.
    * Add `GetTopic` and `GetSubscriptionName` for the internal native consumer.
---
 src/Client.h             |  5 ++--
 src/Consumer.cc          | 44 +++++++++++++++++++++++++++--------
 src/Consumer.h           |  2 ++
 src/LogUtils.h           | 32 ++++++++++++++++++++++++++
 tests/end_to_end.test.js | 60 ++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 132 insertions(+), 11 deletions(-)

diff --git a/src/Client.h b/src/Client.h
index bc20b5c..3755b6c 100644
--- a/src/Client.h
+++ b/src/Client.h
@@ -42,6 +42,9 @@ class Client : public Napi::ObjectWrap<Client> {
   static Napi::Object Init(Napi::Env env, Napi::Object exports);
   static void SetLogHandler(const Napi::CallbackInfo &info);
 
+  static void LogMessage(pulsar_logger_level_t level, const char *file, int 
line, const char *message,
+                         void *ctx);
+
   Client(const Napi::CallbackInfo &info);
   ~Client();
 
@@ -51,8 +54,6 @@ class Client : public Napi::ObjectWrap<Client> {
   std::shared_ptr<pulsar_client_t> cClient;
   std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
 
-  static void LogMessage(pulsar_logger_level_t level, const char *file, int 
line, const char *message,
-                         void *ctx);
   Napi::Value CreateProducer(const Napi::CallbackInfo &info);
   Napi::Value Subscribe(const Napi::CallbackInfo &info);
   Napi::Value CreateReader(const Napi::CallbackInfo &info);
diff --git a/src/Consumer.cc b/src/Consumer.cc
index f5c3e04..328da89 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -22,10 +22,12 @@
 #include "Message.h"
 #include "MessageId.h"
 #include "ThreadSafeDeferred.h"
+#include "LogUtils.h"
 #include <pulsar/c/result.h>
 #include <atomic>
 #include <thread>
 #include <future>
+#include <sstream>
 
 Napi::FunctionReference Consumer::constructor;
 
@@ -63,6 +65,13 @@ struct MessageListenerProxyData {
       : cMessage(cMessage), consumer(consumer), callback(callback) {}
 };
 
+inline void logMessageListenerError(Consumer *consumer, const char *err) {
+  std::ostringstream ss;
+  ss << "[" << consumer->GetTopic() << "][" << consumer->GetSubscriptionName()
+     << "] Message listener error in processing message: " << err;
+  LOG_ERROR(ss.str().c_str());
+}
+
 void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, 
MessageListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Consumer *consumer = data->consumer;
@@ -70,17 +79,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function 
jsCallback, MessageListe
   // `consumer` might be null in certain cases, segmentation fault might 
happend without this null check. We
   // need to handle this rare case in future.
   if (consumer) {
-    Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
+    Napi::Value ret;
+    try {
+      ret = jsCallback.Call({msg, consumer->Value()});
+    } catch (std::exception &exception) {
+      logMessageListenerError(consumer, exception.what());
+    }
+
     if (ret.IsPromise()) {
       Napi::Promise promise = ret.As<Napi::Promise>();
-      Napi::Value thenValue = promise.Get("then");
-      if (thenValue.IsFunction()) {
-        Napi::Function then = thenValue.As<Napi::Function>();
-        Napi::Function callback =
-            Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { 
data->callback(); });
-        then.Call(promise, {callback});
-        return;
-      }
+      Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>();
+
+      ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const 
Napi::CallbackInfo &info) {
+                             Napi::Error error = info[0].As<Napi::Error>();
+                             logMessageListenerError(consumer, error.what());
+                           })});
+
+      promise = ret.As<Napi::Promise>();
+      Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>();
+
+      finallyFunc.Call(
+          promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo 
&info) { data->callback(); })});
+      return;
     }
   }
   data->callback();
@@ -227,6 +247,12 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo 
&info, std::shared_pt
   return deferred->Promise();
 }
 
+std::string Consumer::GetTopic() { return 
{pulsar_consumer_get_topic(this->cConsumer.get())}; }
+
+std::string Consumer::GetSubscriptionName() {
+  return {pulsar_consumer_get_subscription_name(this->cConsumer.get())};
+}
+
 // We still need a receive worker because the c api is missing the equivalent 
async definition
 class ConsumerReceiveWorker : public Napi::AsyncWorker {
  public:
diff --git a/src/Consumer.h b/src/Consumer.h
index 731ec97..1574fab 100644
--- a/src/Consumer.h
+++ b/src/Consumer.h
@@ -36,6 +36,8 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
   void SetListenerCallback(MessageListenerCallback *listener);
   void Cleanup();
   void CleanupListener();
+  std::string GetTopic();
+  std::string GetSubscriptionName();
 
  private:
   std::shared_ptr<pulsar_consumer_t> cConsumer;
diff --git a/src/LogUtils.h b/src/LogUtils.h
new file mode 100644
index 0000000..6e3180e
--- /dev/null
+++ b/src/LogUtils.h
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PULSAR_CLIENT_NODE_LOGUTILS_H
+#define PULSAR_CLIENT_NODE_LOGUTILS_H
+
+#include "Client.h"
+
+#define LOG(level, message) Client::LogMessage(pulsar_logger_level_t::level, 
__FILE__, __LINE__, message, 0)
+
+#define LOG_DEBUG(message) LOG(pulsar_DEBUG, message)
+#define LOG_INFO(message) LOG(pulsar_INFO, message)
+#define LOG_WARN(message) LOG(pulsar_WARN, message)
+#define LOG_ERROR(message) LOG(pulsar_ERROR, message)
+
+#endif  // PULSAR_CLIENT_NODE_LOGUTILS_H
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 7a5621d..dc382cf 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -386,6 +386,66 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Message Listener error handling', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+      });
+      let syncFinsh;
+      const syncPromise = new Promise((resolve) => {
+        syncFinsh = resolve;
+      });
+      let asyncFinsh;
+      const asyncPromise = new Promise((resolve) => {
+        asyncFinsh = resolve;
+      });
+      Pulsar.Client.setLogHandler((level, file, line, message) => {
+        if (level === 3) { // should be error level
+          if (message.includes('consumer1 callback expected error')) {
+            syncFinsh();
+          }
+          if (message.includes('consumer2 callback expected error')) {
+            asyncFinsh();
+          }
+        }
+      });
+
+      const topic = 'test-error-listener';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+      });
+
+      await producer.send('test-message');
+
+      const consumer1 = await client.subscribe({
+        topic,
+        subscription: 'sync',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        listener: (message, messageConsumer) => {
+          throw new Error('consumer1 callback expected error');
+        },
+      });
+
+      const consumer2 = await client.subscribe({
+        topic,
+        subscription: 'async',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        listener: async (message, messageConsumer) => {
+          throw new Error('consumer2 callback expected error');
+        },
+      });
+
+      await syncPromise;
+      await asyncPromise;
+
+      await consumer1.close();
+      await consumer2.close();
+      await producer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',

Reply via email to