http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java new file mode 100644 index 0000000..1c40c0e --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.filtersrv; + +import org.slf4j.Logger; + + +public class FilterServerUtil { + public static void callShell(final String shellString, final Logger log) { + Process process = null; + try { + String[] cmdArray = splitShellString(shellString); + process = Runtime.getRuntime().exec(cmdArray); + process.waitFor(); + log.info("callShell: <{}> OK", shellString); + } catch (Throwable e) { + log.error("callShell: readLine IOException, " + shellString, e); + } finally { + if (null != process) + process.destroy(); + } + } + + private static String[] splitShellString(final String shellString) { + String[] split = shellString.split(" "); + return split; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java new file mode 100644 index 0000000..57a451f --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.latency; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class BrokerFastFailure { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "BrokerFastFailureScheduledThread")); + private final BrokerController brokerController; + + public BrokerFastFailure(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void start() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpiredRequest(); + } + }, 1000, 10, TimeUnit.MILLISECONDS); + } + + private void cleanExpiredRequest() { + while (this.brokerController.getMessageStore().isOSPageCacheBusy()) { + try { + if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { + final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS); + if (null == runnable) { + break; + } + + final RequestTask rt = castRunnable(runnable); + rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size())); + } else { + break; + } + } catch (Throwable e) { + } + } + + while (true) { + try { + if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { + final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek(); + if (null == runnable) { + break; + } + final RequestTask rt = castRunnable(runnable); + if (rt.isStopRun()) { + break; + } + + final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); + if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) { + if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) { + rt.setStopRun(true); + rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size())); + } + } else { + break; + } + } else { + break; + } + } catch (Throwable e) { + } + } + } + + public static RequestTask castRunnable(final Runnable runnable) { + try { + FutureTaskExt object = (FutureTaskExt) runnable; + return (RequestTask) object.getRunnable(); + } catch (Throwable e) { + log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e); + } + + return null; + } + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java new file mode 100644 index 0000000..352543e --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.latency; + +import java.util.concurrent.*; + +/** + * @author shijia.wxr + */ +public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + @Override + protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { + return new FutureTaskExt<T>(runnable, value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java new file mode 100644 index 0000000..642cdd9 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.latency; + +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +/** + * @author shijia.wxr + */ +public class FutureTaskExt<V> extends FutureTask<V> { + private final Runnable runnable; + + public FutureTaskExt(final Callable<V> callable) { + super(callable); + this.runnable = null; + } + + public FutureTaskExt(final Runnable runnable, final V result) { + super(runnable, result); + this.runnable = runnable; + } + + public Runnable getRunnable() { + return runnable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java new file mode 100644 index 0000000..7e9e40a --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.longpolling; + +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ManyPullRequest { + private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>(); + + + public synchronized void addPullRequest(final PullRequest pullRequest) { + this.pullRequestList.add(pullRequest); + } + + + public synchronized void addPullRequest(final List<PullRequest> many) { + this.pullRequestList.addAll(many); + } + + + public synchronized List<PullRequest> cloneListAndClear() { + if (!this.pullRequestList.isEmpty()) { + List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone(); + this.pullRequestList.clear(); + return result; + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java new file mode 100644 index 0000000..f953c1e --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.longpolling; + +import org.apache.rocketmq.store.MessageArrivingListener; + + +public class NotifyMessageArrivingListener implements MessageArrivingListener { + private final PullRequestHoldService pullRequestHoldService; + + + public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) { + this.pullRequestHoldService = pullRequestHoldService; + } + + + @Override + public void arriving(String topic, int queueId, long logicOffset, long tagsCode) { + this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java new file mode 100644 index 0000000..cf03b03 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.longpolling; + +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import io.netty.channel.Channel; + + +/** + * @author shijia.wxr + */ +public class PullRequest { + private final RemotingCommand requestCommand; + private final Channel clientChannel; + private final long timeoutMillis; + private final long suspendTimestamp; + private final long pullFromThisOffset; + private final SubscriptionData subscriptionData; + + + public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp, + long pullFromThisOffset, SubscriptionData subscriptionData) { + this.requestCommand = requestCommand; + this.clientChannel = clientChannel; + this.timeoutMillis = timeoutMillis; + this.suspendTimestamp = suspendTimestamp; + this.pullFromThisOffset = pullFromThisOffset; + this.subscriptionData = subscriptionData; + } + + + public RemotingCommand getRequestCommand() { + return requestCommand; + } + + + public Channel getClientChannel() { + return clientChannel; + } + + + public long getTimeoutMillis() { + return timeoutMillis; + } + + + public long getSuspendTimestamp() { + return suspendTimestamp; + } + + + public long getPullFromThisOffset() { + return pullFromThisOffset; + } + + public SubscriptionData getSubscriptionData() { + return subscriptionData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java new file mode 100644 index 0000000..19a3f54 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.longpolling; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.common.SystemClock; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.store.DefaultMessageFilter; +import org.apache.rocketmq.store.MessageFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class PullRequestHoldService extends ServiceThread { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final String TOPIC_QUEUEID_SEPARATOR = "@"; + private final BrokerController brokerController; + private final SystemClock systemClock = new SystemClock(); + private final MessageFilter messageFilter = new DefaultMessageFilter(); + private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = + new ConcurrentHashMap<String, ManyPullRequest>(1024); + + + public PullRequestHoldService(final BrokerController brokerController) { + this.brokerController = brokerController; + } + + public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) { + String key = this.buildKey(topic, queueId); + ManyPullRequest mpr = this.pullRequestTable.get(key); + if (null == mpr) { + mpr = new ManyPullRequest(); + ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr); + if (prev != null) { + mpr = prev; + } + } + + mpr.addPullRequest(pullRequest); + } + + private String buildKey(final String topic, final int queueId) { + StringBuilder sb = new StringBuilder(); + sb.append(topic); + sb.append(TOPIC_QUEUEID_SEPARATOR); + sb.append(queueId); + return sb.toString(); + } + + @Override + public void run() { + log.info(this.getServiceName() + " service started"); + while (!this.isStopped()) { + try { + if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { + this.waitForRunning(5 * 1000); + } else { + this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); + } + + long beginLockTimestamp = this.systemClock.now(); + this.checkHoldRequest(); + long costTime = this.systemClock.now() - beginLockTimestamp; + if (costTime > 5 * 1000) { + log.info("[NOTIFYME] check hold request cost {} ms.", costTime); + } + } catch (Throwable e) { + log.warn(this.getServiceName() + " service has exception. ", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + return PullRequestHoldService.class.getSimpleName(); + } + + private void checkHoldRequest() { + for (String key : this.pullRequestTable.keySet()) { + String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); + if (kArray != null && 2 == kArray.length) { + String topic = kArray[0]; + int queueId = Integer.parseInt(kArray[1]); + final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); + try { + this.notifyMessageArriving(topic, queueId, offset); + } catch (Throwable e) { + log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e); + } + } + } + } + + public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) { + notifyMessageArriving(topic, queueId, maxOffset, null); + } + + public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) { + String key = this.buildKey(topic, queueId); + ManyPullRequest mpr = this.pullRequestTable.get(key); + if (mpr != null) { + List<PullRequest> requestList = mpr.cloneListAndClear(); + if (requestList != null) { + List<PullRequest> replayList = new ArrayList<PullRequest>(); + + for (PullRequest request : requestList) { + long newestOffset = maxOffset; + if (newestOffset <= request.getPullFromThisOffset()) { + newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); + } + + Long tmp = tagsCode; + if (newestOffset > request.getPullFromThisOffset()) { + if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) { + try { + this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + request.getRequestCommand()); + } catch (Throwable e) { + log.error("execute request when wakeup failed.", e); + } + continue; + } + } + + if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { + try { + this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + request.getRequestCommand()); + } catch (Throwable e) { + log.error("execute request when wakeup failed.", e); + } + continue; + } + + + replayList.add(request); + } + + if (!replayList.isEmpty()) { + mpr.addPullRequest(replayList); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java new file mode 100644 index 0000000..3a167fa --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.mqtrace; + +import org.apache.rocketmq.store.stats.BrokerStatsManager; + +import java.util.Map; + + +public class ConsumeMessageContext { + private String consumerGroup; + private String topic; + private Integer queueId; + private String clientHost; + private String storeHost; + private Map<String, Long> messageIds; + private int bodyLength; + private boolean success; + private String status; + private Object mqTraceContext; + + private String commercialOwner; + private BrokerStatsManager.StatsType commercialRcvStats; + private int commercialRcvTimes; + private int commercialRcvSize; + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public Integer getQueueId() { + return queueId; + } + + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + + + public String getClientHost() { + return clientHost; + } + + + public void setClientHost(String clientHost) { + this.clientHost = clientHost; + } + + + public String getStoreHost() { + return storeHost; + } + + + public void setStoreHost(String storeHost) { + this.storeHost = storeHost; + } + + + public Map<String, Long> getMessageIds() { + return messageIds; + } + + + public void setMessageIds(Map<String, Long> messageIds) { + this.messageIds = messageIds; + } + + + public boolean isSuccess() { + return success; + } + + + public void setSuccess(boolean success) { + this.success = success; + } + + + public String getStatus() { + return status; + } + + + public void setStatus(String status) { + this.status = status; + } + + + public Object getMqTraceContext() { + return mqTraceContext; + } + + + public void setMqTraceContext(Object mqTraceContext) { + this.mqTraceContext = mqTraceContext; + } + + + public int getBodyLength() { + return bodyLength; + } + + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } + + public String getCommercialOwner() { + return commercialOwner; + } + + public void setCommercialOwner(final String commercialOwner) { + this.commercialOwner = commercialOwner; + } + + public BrokerStatsManager.StatsType getCommercialRcvStats() { + return commercialRcvStats; + } + + public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) { + this.commercialRcvStats = commercialRcvStats; + } + + public int getCommercialRcvTimes() { + return commercialRcvTimes; + } + + public void setCommercialRcvTimes(final int commercialRcvTimes) { + this.commercialRcvTimes = commercialRcvTimes; + } + + public int getCommercialRcvSize() { + return commercialRcvSize; + } + + public void setCommercialRcvSize(final int commercialRcvSize) { + this.commercialRcvSize = commercialRcvSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java new file mode 100644 index 0000000..c4b7f36 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.mqtrace; + +public interface ConsumeMessageHook { + String hookName(); + + + void consumeMessageBefore(final ConsumeMessageContext context); + + + void consumeMessageAfter(final ConsumeMessageContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java new file mode 100644 index 0000000..ca8121d --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.mqtrace; + +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.store.stats.BrokerStatsManager; + +import java.util.Properties; + + +public class SendMessageContext { + private String producerGroup; + private String topic; + private String msgId; + private String originMsgId; + private Integer queueId; + private Long queueOffset; + private String brokerAddr; + private String bornHost; + private int bodyLength; + private int code; + private String errorMsg; + private String msgProps; + private Object mqTraceContext; + private Properties extProps; + private String brokerRegionId; + private String msgUniqueKey; + private long bornTimeStamp; + private MessageType msgType = MessageType.Trans_msg_Commit; + private boolean isSuccess = false; + //For Commercial + private String commercialOwner; + private BrokerStatsManager.StatsType commercialSendStats; + private int commercialSendSize; + private int commercialSendTimes; + + public boolean isSuccess() { + return isSuccess; + } + + public void setSuccess(final boolean success) { + isSuccess = success; + } + + public MessageType getMsgType() { + return msgType; + } + + public void setMsgType(final MessageType msgType) { + this.msgType = msgType; + } + + public String getMsgUniqueKey() { + return msgUniqueKey; + } + + public void setMsgUniqueKey(final String msgUniqueKey) { + this.msgUniqueKey = msgUniqueKey; + } + + public long getBornTimeStamp() { + return bornTimeStamp; + } + + public void setBornTimeStamp(final long bornTimeStamp) { + this.bornTimeStamp = bornTimeStamp; + } + + public String getBrokerRegionId() { + return brokerRegionId; + } + + public void setBrokerRegionId(final String brokerRegionId) { + this.brokerRegionId = brokerRegionId; + } + + public String getProducerGroup() { + return producerGroup; + } + + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + + public String getTopic() { + return topic; + } + + + public void setTopic(String topic) { + this.topic = topic; + } + + + public String getMsgId() { + return msgId; + } + + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + + public String getOriginMsgId() { + return originMsgId; + } + + + public void setOriginMsgId(String originMsgId) { + this.originMsgId = originMsgId; + } + + + public Integer getQueueId() { + return queueId; + } + + + public void setQueueId(Integer queueId) { + this.queueId = queueId; + } + + + public Long getQueueOffset() { + return queueOffset; + } + + + public void setQueueOffset(Long queueOffset) { + this.queueOffset = queueOffset; + } + + + public String getBrokerAddr() { + return brokerAddr; + } + + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + + public String getBornHost() { + return bornHost; + } + + + public void setBornHost(String bornHost) { + this.bornHost = bornHost; + } + + + public int getBodyLength() { + return bodyLength; + } + + + public void setBodyLength(int bodyLength) { + this.bodyLength = bodyLength; + } + + + public int getCode() { + return code; + } + + + public void setCode(int code) { + this.code = code; + } + + + public String getErrorMsg() { + return errorMsg; + } + + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + + + public String getMsgProps() { + return msgProps; + } + + + public void setMsgProps(String msgProps) { + this.msgProps = msgProps; + } + + + public Object getMqTraceContext() { + return mqTraceContext; + } + + + public void setMqTraceContext(Object mqTraceContext) { + this.mqTraceContext = mqTraceContext; + } + + + public Properties getExtProps() { + return extProps; + } + + + public void setExtProps(Properties extProps) { + this.extProps = extProps; + } + + public String getCommercialOwner() { + return commercialOwner; + } + + public void setCommercialOwner(final String commercialOwner) { + this.commercialOwner = commercialOwner; + } + + public BrokerStatsManager.StatsType getCommercialSendStats() { + return commercialSendStats; + } + + public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) { + this.commercialSendStats = commercialSendStats; + } + + public int getCommercialSendSize() { + return commercialSendSize; + } + + public void setCommercialSendSize(final int commercialSendSize) { + this.commercialSendSize = commercialSendSize; + } + + public int getCommercialSendTimes() { + return commercialSendTimes; + } + + public void setCommercialSendTimes(final int commercialSendTimes) { + this.commercialSendTimes = commercialSendTimes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java new file mode 100644 index 0000000..84cbdcb --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.mqtrace; + +public interface SendMessageHook { + public String hookName(); + + + public void sendMessageBefore(final SendMessageContext context); + + + public void sendMessageAfter(final SendMessageContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java new file mode 100644 index 0000000..8a1773a --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.offset; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.BrokerPathConfigHelper; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class ConsumerOffsetManager extends ConfigManager { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private static final String TOPIC_GROUP_SEPARATOR = "@"; + + private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = + new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + + private transient BrokerController brokerController; + + + public ConsumerOffsetManager() { + } + + + public ConsumerOffsetManager(BrokerController brokerController) { + this.brokerController = brokerController; + } + + + public void scanUnsubscribedTopic() { + Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + String topicAtGroup = next.getKey(); + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays != null && arrays.length == 2) { + String topic = arrays[0]; + String group = arrays[1]; + + if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic) + && this.offsetBehindMuchThanData(topic, next.getValue())) { + it.remove(); + log.warn("remove topic offset, {}", topicAtGroup); + } + } + } + } + + + private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) { + Iterator<Entry<Integer, Long>> it = table.entrySet().iterator(); + boolean result = !table.isEmpty(); + + while (it.hasNext() && result) { + Entry<Integer, Long> next = it.next(); + long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey()); + long offsetInPersist = next.getValue(); + if (offsetInPersist > minOffsetInStore) { + result = false; + } else { + result = true; + } + } + + return result; + } + + + public Set<String> whichTopicByConsumer(final String group) { + Set<String> topics = new HashSet<String>(); + + Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + String topicAtGroup = next.getKey(); + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays != null && arrays.length == 2) { + if (group.equals(arrays[1])) { + topics.add(arrays[0]); + } + } + } + + return topics; + } + + + public Set<String> whichGroupByTopic(final String topic) { + Set<String> groups = new HashSet<String>(); + + Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + String topicAtGroup = next.getKey(); + String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (arrays != null && arrays.length == 2) { + if (topic.equals(arrays[0])) { + groups.add(arrays[1]); + } + } + } + + return groups; + } + + + public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + this.commitOffset(clientHost, key, queueId, offset); + } + + private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { + ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); + if (null == map) { + map = new ConcurrentHashMap<Integer, Long>(32); + map.put(queueId, offset); + this.offsetTable.put(key, map); + } else { + Long storeOffset = map.put(queueId, offset); + if (storeOffset != null && offset < storeOffset) { + log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); + } + } + } + + public long queryOffset(final String group, final String topic, final int queueId) { + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); + if (null != map) { + Long offset = map.get(queueId); + if (offset != null) + return offset; + } + + return -1; + } + + public String encode() { + return this.encode(false); + } + + @Override + public String configFilePath() { + return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); + } + + @Override + public void decode(String jsonString) { + if (jsonString != null) { + ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); + if (obj != null) { + this.offsetTable = obj.offsetTable; + } + } + } + + public String encode(final boolean prettyFormat) { + return RemotingSerializable.toJson(this, prettyFormat); + } + + public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { + return offsetTable; + } + + + public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { + this.offsetTable = offsetTable; + } + + + public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) { + + Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>(); + Set<String> topicGroups = this.offsetTable.keySet(); + if (!UtilAll.isBlank(filterGroups)) { + for (String group : filterGroups.split(",")) { + Iterator<String> it = topicGroups.iterator(); + while (it.hasNext()) { + if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) { + it.remove(); + } + } + } + } + + for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) { + String topicGroup = offSetEntry.getKey(); + String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); + if (topic.equals(topicGroupArr[0])) { + for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) { + long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey()); + if (entry.getValue() >= minOffset) { + Long offset = queueMinOffset.get(entry.getKey()); + if (offset == null) { + queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue())); + } else { + queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset)); + } + } + } + } + + } + return queueMinOffset; + } + + + public Map<Integer, Long> queryOffset(final String group, final String topic) { + // topic@group + String key = topic + TOPIC_GROUP_SEPARATOR + group; + return this.offsetTable.get(key); + } + + + public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { + ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); + if (offsets != null) { + this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets)); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java new file mode 100644 index 0000000..2836c4c --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.out; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.namesrv.RegisterBrokerResult; +import org.apache.rocketmq.common.namesrv.TopAddressing; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.*; +import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyRemotingClient; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + + +/** + * @author shijia.wxr + * @author manhong.yqd + */ +public class BrokerOuterAPI { + private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); + private final RemotingClient remotingClient; + private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR); + private String nameSrvAddr = null; + + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) { + this(nettyClientConfig, null); + } + + public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) { + this.remotingClient = new NettyRemotingClient(nettyClientConfig); + this.remotingClient.registerRPCHook(rpcHook); + } + + public void start() { + this.remotingClient.start(); + } + + public void shutdown() { + this.remotingClient.shutdown(); + } + + public String fetchNameServerAddr() { + try { + String addrs = this.topAddressing.fetchNSAddr(); + if (addrs != null) { + if (!addrs.equals(this.nameSrvAddr)) { + log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs); + this.updateNameServerAddressList(addrs); + this.nameSrvAddr = addrs; + return nameSrvAddr; + } + } + } catch (Exception e) { + log.error("fetchNameServerAddr Exception", e); + } + return nameSrvAddr; + } + + public void updateNameServerAddressList(final String addrs) { + List<String> lst = new ArrayList<String>(); + String[] addrArray = addrs.split(";"); + if (addrArray != null) { + for (String addr : addrArray) { + lst.add(addr); + } + + this.remotingClient.updateNameServerAddressList(lst); + } + } + + public RegisterBrokerResult registerBrokerAll( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final boolean oneway, + final int timeoutMills) { + RegisterBrokerResult registerBrokerResult = null; + + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null) { + for (String namesrvAddr : nameServerAddressList) { + try { + RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, + haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); + if (result != null) { + registerBrokerResult = result; + } + + log.info("register broker to name server {} OK", namesrvAddr); + } catch (Exception e) { + log.warn("registerBroker Exception, " + namesrvAddr, e); + } + } + } + + return registerBrokerResult; + } + + private RegisterBrokerResult registerBroker( + final String namesrvAddr, + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId, + final String haServerAddr, + final TopicConfigSerializeWrapper topicConfigWrapper, + final List<String> filterServerList, + final boolean oneway, + final int timeoutMills + ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException { + RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + requestHeader.setHaServerAddr(haServerAddr); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); + + RegisterBrokerBody requestBody = new RegisterBrokerBody(); + requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); + requestBody.setFilterServerList(filterServerList); + request.setBody(requestBody.encode()); + + if (oneway) { + try { + this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); + } catch (RemotingTooMuchRequestException e) { + } + return null; + } + + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + RegisterBrokerResponseHeader responseHeader = + (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); + RegisterBrokerResult result = new RegisterBrokerResult(); + result.setMasterAddr(responseHeader.getMasterAddr()); + result.setHaServerAddr(responseHeader.getHaServerAddr()); + result.setHaServerAddr(responseHeader.getHaServerAddr()); + if (response.getBody() != null) { + result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); + } + return result; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public void unregisterBrokerAll( + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId + ) { + List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); + if (nameServerAddressList != null) { + for (String namesrvAddr : nameServerAddressList) { + try { + this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); + log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); + } catch (Exception e) { + log.warn("unregisterBroker Exception, " + namesrvAddr, e); + } + } + } + } + + public void unregisterBroker( + final String namesrvAddr, + final String clusterName, + final String brokerAddr, + final String brokerName, + final long brokerId + ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader(); + requestHeader.setBrokerAddr(brokerAddr); + requestHeader.setBrokerId(brokerId); + requestHeader.setBrokerName(brokerName); + requestHeader.setClusterName(clusterName); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException, UnsupportedEncodingException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return new String(response.getBody(), MixAll.DEFAULT_CHARSET); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); + RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class); + } + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public void registerRPCHook(RPCHook rpcHook) { + remotingClient.registerRPCHook(rpcHook); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java new file mode 100644 index 0000000..d26eab8 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pagecache; + +import org.apache.rocketmq.store.GetMessageResult; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion { + private final ByteBuffer byteBufferHeader; + private final GetMessageResult getMessageResult; + private long transfered; // the bytes which was transfered already + + + public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) { + this.byteBufferHeader = byteBufferHeader; + this.getMessageResult = getMessageResult; + } + + + @Override + public long position() { + int pos = byteBufferHeader.position(); + List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + pos += bb.position(); + } + return pos; + } + + @Override + public long transfered() { + return transfered; + } + + @Override + public long count() { + return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (this.byteBufferHeader.hasRemaining()) { + transfered += target.write(this.byteBufferHeader); + return transfered; + } else { + List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + if (bb.hasRemaining()) { + transfered += target.write(bb); + return transfered; + } + } + } + + return 0; + } + + public void close() { + this.deallocate(); + } + + @Override + protected void deallocate() { + this.getMessageResult.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java new file mode 100644 index 0000000..97d1faa --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pagecache; + +import org.apache.rocketmq.store.SelectMappedBufferResult; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + + +/** + * @author shijia.wxr + */ +public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion { + private final ByteBuffer byteBufferHeader; + private final SelectMappedBufferResult selectMappedBufferResult; + private long transfered; // the bytes which was transfered already + + + public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) { + this.byteBufferHeader = byteBufferHeader; + this.selectMappedBufferResult = selectMappedBufferResult; + } + + + @Override + public long position() { + return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position(); + } + + @Override + public long transfered() { + return transfered; + } + + @Override + public long count() { + return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (this.byteBufferHeader.hasRemaining()) { + transfered += target.write(this.byteBufferHeader); + return transfered; + } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { + transfered += target.write(this.selectMappedBufferResult.getByteBuffer()); + return transfered; + } + + return 0; + } + + public void close() { + this.deallocate(); + } + + @Override + protected void deallocate() { + this.selectMappedBufferResult.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java new file mode 100644 index 0000000..2d21c19 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.pagecache; + +import org.apache.rocketmq.store.QueryMessageResult; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion { + private final ByteBuffer byteBufferHeader; + private final QueryMessageResult queryMessageResult; + private long transfered; // the bytes which was transfered already + + + public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) { + this.byteBufferHeader = byteBufferHeader; + this.queryMessageResult = queryMessageResult; + } + + + @Override + public long position() { + int pos = byteBufferHeader.position(); + List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + pos += bb.position(); + } + return pos; + } + + @Override + public long transfered() { + return transfered; + } + + @Override + public long count() { + return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize(); + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (this.byteBufferHeader.hasRemaining()) { + transfered += target.write(this.byteBufferHeader); + return transfered; + } else { + List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList(); + for (ByteBuffer bb : messageBufferList) { + if (bb.hasRemaining()) { + transfered += target.write(bb); + return transfered; + } + } + } + + return 0; + } + + public void close() { + this.deallocate(); + } + + @Override + protected void deallocate() { + this.queryMessageResult.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java new file mode 100644 index 0000000..601e2f3 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.rocketmq.broker.plugin; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.store.*; + +import java.util.HashMap; +import java.util.Set; + +public abstract class AbstractPluginMessageStore implements MessageStore { + protected MessageStore next = null; + protected MessageStorePluginContext context; + + public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) { + this.next = next; + this.context = context; + } + + @Override + public long getEarliestMessageTime() { + return next.getEarliestMessageTime(); + } + + @Override + public long lockTimeMills() { + return next.lockTimeMills(); + } + + @Override + public boolean isOSPageCacheBusy() { + return next.isOSPageCacheBusy(); + } + + @Override + public boolean isTransientStorePoolDeficient() { + return next.isTransientStorePoolDeficient(); + } + + @Override + public boolean load() { + return next.load(); + } + + @Override + public void start() throws Exception { + next.start(); + } + + @Override + public void shutdown() { + next.shutdown(); + } + + @Override + public void destroy() { + next.destroy(); + } + + @Override + public PutMessageResult putMessage(MessageExtBrokerInner msg) { + return next.putMessage(msg); + } + + @Override + public GetMessageResult getMessage(String group, String topic, int queueId, long offset, + int maxMsgNums, SubscriptionData subscriptionData) { + return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData); + } + + @Override + public long getMaxOffsetInQuque(String topic, int queueId) { + return next.getMaxOffsetInQuque(topic, queueId); + } + + @Override + public long getMinOffsetInQuque(String topic, int queueId) { + return next.getMinOffsetInQuque(topic, queueId); + } + + @Override + public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) { + return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset); + } + + @Override + public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { + return next.getOffsetInQueueByTime(topic, queueId, timestamp); + } + + @Override + public MessageExt lookMessageByOffset(long commitLogOffset) { + return next.lookMessageByOffset(commitLogOffset); + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) { + return next.selectOneMessageByOffset(commitLogOffset); + } + + @Override + public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) { + return next.selectOneMessageByOffset(commitLogOffset, msgSize); + } + + @Override + public String getRunningDataInfo() { + return next.getRunningDataInfo(); + } + + @Override + public HashMap<String, String> getRuntimeInfo() { + return next.getRuntimeInfo(); + } + + @Override + public long getMaxPhyOffset() { + return next.getMaxPhyOffset(); + } + + @Override + public long getMinPhyOffset() { + return next.getMinPhyOffset(); + } + + @Override + public long getEarliestMessageTime(String topic, int queueId) { + return next.getEarliestMessageTime(topic, queueId); + } + + @Override + public long getMessageStoreTimeStamp(String topic, int queueId, long offset) { + return next.getMessageStoreTimeStamp(topic, queueId, offset); + } + + @Override + public long getMessageTotalInQueue(String topic, int queueId) { + return next.getMessageTotalInQueue(topic, queueId); + } + + @Override + public SelectMappedBufferResult getCommitLogData(long offset) { + return next.getCommitLogData(offset); + } + + @Override + public boolean appendToCommitLog(long startOffset, byte[] data) { + return next.appendToCommitLog(startOffset, data); + } + + @Override + public void excuteDeleteFilesManualy() { + next.excuteDeleteFilesManualy(); + } + + @Override + public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, + long end) { + return next.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public void updateHaMasterAddress(String newAddr) { + next.updateHaMasterAddress(newAddr); + } + + @Override + public long slaveFallBehindMuch() { + return next.slaveFallBehindMuch(); + } + + @Override + public long now() { + return next.now(); + } + + @Override + public int cleanUnusedTopic(Set<String> topics) { + return next.cleanUnusedTopic(topics); + } + + @Override + public void cleanExpiredConsumerQueue() { + next.cleanExpiredConsumerQueue(); + } + + @Override + public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) { + return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset); + } + + @Override + public long dispatchBehindBytes() { + return next.dispatchBehindBytes(); + } + + @Override + public long flush() { + return next.flush(); + } + + @Override + public boolean resetWriteOffset(long phyOffset) { + return next.resetWriteOffset(phyOffset); + } + + @Override + public long getConfirmOffset() { + return next.getConfirmOffset(); + } + + @Override + public void setConfirmOffset(long phyOffset) { + next.setConfirmOffset(phyOffset); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java new file mode 100644 index 0000000..d27b6aa --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.rocketmq.broker.plugin; + +import org.apache.rocketmq.store.MessageStore; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +public final class MessageStoreFactory { + public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore) + throws IOException { + String plugin = context.getBrokerConfig().getMessageStorePlugIn(); + if (plugin != null && plugin.trim().length() != 0) { + String[] pluginClasses = plugin.split(","); + for (int i = pluginClasses.length - 1; i >= 0; --i) { + String pluginClass = pluginClasses[i]; + try { + @SuppressWarnings("unchecked") + Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass); + Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class); + AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore); + messageStore = pluginMessageStore; + } catch (Throwable e) { + throw new RuntimeException(String.format( + "Initialize plugin's class %s not found!", pluginClass), e); + } + } + } + return messageStore; + } +}