This is an automated email from the ASF dual-hosted git repository. lamberliu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push: new 7866569 [TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67) 7866569 is described below commit 786656958628b27f48c20745eb9697c329ea72dc Author: gosonzhang <4675...@qq.com> AuthorDate: Thu Apr 30 09:07:00 2020 +0000 [TUBEMQ-85] There is NPE when creating PullConsumer with TubeSingleSessionFactory (#67) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../client/factory/TubeMultiSessionFactory.java | 15 ++++++++++++- .../client/factory/TubeSingleSessionFactory.java | 26 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java index 3dbb561..1cf91be 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeMultiSessionFactory.java @@ -34,12 +34,13 @@ public class TubeMultiSessionFactory implements MessageSessionFactory { private final NettyClientFactory clientFactory = new NettyClientFactory(); private final TubeBaseSessionFactory baseSessionFactory; - private final AtomicBoolean isShutDown = new AtomicBoolean(false); + private final AtomicBoolean isShutDown = new AtomicBoolean(true); public TubeMultiSessionFactory(final TubeClientConfig tubeClientConfig) throws TubeClientException { RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, false); clientFactory.configure(config); baseSessionFactory = new TubeBaseSessionFactory(clientFactory, tubeClientConfig); + isShutDown.set(false); } @Override @@ -52,23 +53,35 @@ public class TubeMultiSessionFactory implements MessageSessionFactory { @Override public <T extends Shutdownable> void removeClient(final T client) { + if (baseSessionFactory == null) { + return; + } this.baseSessionFactory.removeClient(client); } @Override public MessageProducer createProducer() throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } return this.baseSessionFactory.createProducer(); } @Override public PushMessageConsumer createPushConsumer(final ConsumerConfig consumerConfig) throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } return this.baseSessionFactory.createPushConsumer(consumerConfig); } @Override public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig) throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } return this.baseSessionFactory.createPullConsumer(consumerConfig); } diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java index a574bf4..1532645 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/factory/TubeSingleSessionFactory.java @@ -17,6 +17,7 @@ package org.apache.tubemq.client.factory; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.tubemq.client.config.ConsumerConfig; import org.apache.tubemq.client.config.TubeClientConfig; @@ -33,6 +34,7 @@ import org.apache.tubemq.corerpc.netty.NettyClientFactory; public class TubeSingleSessionFactory implements MessageSessionFactory { private static final NettyClientFactory clientFactory = new NettyClientFactory(); + private static final AtomicBoolean isShutDown = new AtomicBoolean(true); private static final AtomicLong referenceCounter = new AtomicLong(0); private static TubeBaseSessionFactory baseSessionFactory; @@ -42,37 +44,61 @@ public class TubeSingleSessionFactory implements MessageSessionFactory { RpcConfig config = TubeClientConfigUtils.getRpcConfigByClientConfig(tubeClientConfig, true); clientFactory.configure(config); baseSessionFactory = new TubeBaseSessionFactory(clientFactory, tubeClientConfig); + isShutDown.set(false); + } + while (isShutDown.get()) { + try { + Thread.sleep(50); + } catch (Throwable e) { + break; + } } } @Override public void shutdown() throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } if (referenceCounter.decrementAndGet() > 0) { return; } baseSessionFactory.shutdown(); clientFactory.shutdown(); + isShutDown.set(true); } @Override public <T extends Shutdownable> void removeClient(final T client) { + if (baseSessionFactory == null) { + return; + } baseSessionFactory.removeClient(client); } @Override public MessageProducer createProducer() throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } return baseSessionFactory.createProducer(); } @Override public PushMessageConsumer createPushConsumer(ConsumerConfig consumerConfig) throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } return baseSessionFactory.createPushConsumer(consumerConfig); } @Override public PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig) throws TubeClientException { + if (isShutDown.get()) { + throw new TubeClientException("Please initialize the object first!"); + } return baseSessionFactory.createPullConsumer(consumerConfig); }