This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push: new 448a504 [Improve] Add error handling for the message listener (#319) 448a504 is described below commit 448a5046c9fd70b749e8dac31f395c90498b027e Author: Zike Yang <z...@apache.org> AuthorDate: Thu Apr 27 14:33:28 2023 +0800 [Improve] Add error handling for the message listener (#319) ## Motivation Currently, there is no error handling for the message listener. If there are any errors thrown from the user's listener, the program will crash. ## Modification * Add error handling for the message listener. The client won't crash the program if there are any errors in the user function. Instead, it will log as the error. * Add LogUtils to the internal native code. * Add `GetTopic` and `GetSubscriptionName` for the internal native consumer. --- src/Client.h | 5 ++-- src/Consumer.cc | 44 +++++++++++++++++++++++++++-------- src/Consumer.h | 2 ++ src/LogUtils.h | 32 ++++++++++++++++++++++++++ tests/end_to_end.test.js | 60 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 132 insertions(+), 11 deletions(-) diff --git a/src/Client.h b/src/Client.h index bc20b5c..3755b6c 100644 --- a/src/Client.h +++ b/src/Client.h @@ -42,6 +42,9 @@ class Client : public Napi::ObjectWrap<Client> { static Napi::Object Init(Napi::Env env, Napi::Object exports); static void SetLogHandler(const Napi::CallbackInfo &info); + static void LogMessage(pulsar_logger_level_t level, const char *file, int line, const char *message, + void *ctx); + Client(const Napi::CallbackInfo &info); ~Client(); @@ -51,8 +54,6 @@ class Client : public Napi::ObjectWrap<Client> { std::shared_ptr<pulsar_client_t> cClient; std::shared_ptr<pulsar_client_configuration_t> cClientConfig; - static void LogMessage(pulsar_logger_level_t level, const char *file, int line, const char *message, - void *ctx); Napi::Value CreateProducer(const Napi::CallbackInfo &info); Napi::Value Subscribe(const Napi::CallbackInfo &info); Napi::Value CreateReader(const Napi::CallbackInfo &info); diff --git a/src/Consumer.cc b/src/Consumer.cc index f5c3e04..328da89 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -22,10 +22,12 @@ #include "Message.h" #include "MessageId.h" #include "ThreadSafeDeferred.h" +#include "LogUtils.h" #include <pulsar/c/result.h> #include <atomic> #include <thread> #include <future> +#include <sstream> Napi::FunctionReference Consumer::constructor; @@ -63,6 +65,13 @@ struct MessageListenerProxyData { : cMessage(cMessage), consumer(consumer), callback(callback) {} }; +inline void logMessageListenerError(Consumer *consumer, const char *err) { + std::ostringstream ss; + ss << "[" << consumer->GetTopic() << "][" << consumer->GetSubscriptionName() + << "] Message listener error in processing message: " << err; + LOG_ERROR(ss.str().c_str()); +} + void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListenerProxyData *data) { Napi::Object msg = Message::NewInstance({}, data->cMessage); Consumer *consumer = data->consumer; @@ -70,17 +79,28 @@ void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, MessageListe // `consumer` might be null in certain cases, segmentation fault might happend without this null check. We // need to handle this rare case in future. if (consumer) { - Napi::Value ret = jsCallback.Call({msg, consumer->Value()}); + Napi::Value ret; + try { + ret = jsCallback.Call({msg, consumer->Value()}); + } catch (std::exception &exception) { + logMessageListenerError(consumer, exception.what()); + } + if (ret.IsPromise()) { Napi::Promise promise = ret.As<Napi::Promise>(); - Napi::Value thenValue = promise.Get("then"); - if (thenValue.IsFunction()) { - Napi::Function then = thenValue.As<Napi::Function>(); - Napi::Function callback = - Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); }); - then.Call(promise, {callback}); - return; - } + Napi::Function catchFunc = promise.Get("catch").As<Napi::Function>(); + + ret = catchFunc.Call(promise, {Napi::Function::New(env, [consumer](const Napi::CallbackInfo &info) { + Napi::Error error = info[0].As<Napi::Error>(); + logMessageListenerError(consumer, error.what()); + })}); + + promise = ret.As<Napi::Promise>(); + Napi::Function finallyFunc = promise.Get("finally").As<Napi::Function>(); + + finallyFunc.Call( + promise, {Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); })}); + return; } } data->callback(); @@ -227,6 +247,12 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt return deferred->Promise(); } +std::string Consumer::GetTopic() { return {pulsar_consumer_get_topic(this->cConsumer.get())}; } + +std::string Consumer::GetSubscriptionName() { + return {pulsar_consumer_get_subscription_name(this->cConsumer.get())}; +} + // We still need a receive worker because the c api is missing the equivalent async definition class ConsumerReceiveWorker : public Napi::AsyncWorker { public: diff --git a/src/Consumer.h b/src/Consumer.h index 731ec97..1574fab 100644 --- a/src/Consumer.h +++ b/src/Consumer.h @@ -36,6 +36,8 @@ class Consumer : public Napi::ObjectWrap<Consumer> { void SetListenerCallback(MessageListenerCallback *listener); void Cleanup(); void CleanupListener(); + std::string GetTopic(); + std::string GetSubscriptionName(); private: std::shared_ptr<pulsar_consumer_t> cConsumer; diff --git a/src/LogUtils.h b/src/LogUtils.h new file mode 100644 index 0000000..6e3180e --- /dev/null +++ b/src/LogUtils.h @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PULSAR_CLIENT_NODE_LOGUTILS_H +#define PULSAR_CLIENT_NODE_LOGUTILS_H + +#include "Client.h" + +#define LOG(level, message) Client::LogMessage(pulsar_logger_level_t::level, __FILE__, __LINE__, message, 0) + +#define LOG_DEBUG(message) LOG(pulsar_DEBUG, message) +#define LOG_INFO(message) LOG(pulsar_INFO, message) +#define LOG_WARN(message) LOG(pulsar_WARN, message) +#define LOG_ERROR(message) LOG(pulsar_ERROR, message) + +#endif // PULSAR_CLIENT_NODE_LOGUTILS_H diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 7a5621d..dc382cf 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -386,6 +386,66 @@ const Pulsar = require('../index.js'); await client.close(); }); + test('Message Listener error handling', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + }); + let syncFinsh; + const syncPromise = new Promise((resolve) => { + syncFinsh = resolve; + }); + let asyncFinsh; + const asyncPromise = new Promise((resolve) => { + asyncFinsh = resolve; + }); + Pulsar.Client.setLogHandler((level, file, line, message) => { + if (level === 3) { // should be error level + if (message.includes('consumer1 callback expected error')) { + syncFinsh(); + } + if (message.includes('consumer2 callback expected error')) { + asyncFinsh(); + } + } + }); + + const topic = 'test-error-listener'; + const producer = await client.createProducer({ + topic, + batchingEnabled: false, + }); + + await producer.send('test-message'); + + const consumer1 = await client.subscribe({ + topic, + subscription: 'sync', + subscriptionType: 'Shared', + subscriptionInitialPosition: 'Earliest', + listener: (message, messageConsumer) => { + throw new Error('consumer1 callback expected error'); + }, + }); + + const consumer2 = await client.subscribe({ + topic, + subscription: 'async', + subscriptionType: 'Shared', + subscriptionInitialPosition: 'Earliest', + listener: async (message, messageConsumer) => { + throw new Error('consumer2 callback expected error'); + }, + }); + + await syncPromise; + await asyncPromise; + + await consumer1.close(); + await consumer2.close(); + await producer.close(); + await client.close(); + }); + test('acknowledgeCumulative', async () => { const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650',