http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 3e6673c..5fdcab2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.client.impl;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -25,7 +32,12 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
@@ -39,42 +51,28 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
public class MQAdminImpl {
private final Logger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private long timeoutMillis = 6000;
-
public MQAdminImpl(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
-
public long getTimeoutMillis() {
return timeoutMillis;
}
-
public void setTimeoutMillis(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
-
public void createTopic(String key, String newTopic, int queueNum) throws
MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
-
public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag) throws MQClientException {
try {
TopicRouteData topicRouteData =
this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key,
timeoutMillis);
@@ -129,7 +127,6 @@ public class MQAdminImpl {
}
}
-
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws
MQClientException {
try {
TopicRouteData topicRouteData =
this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic,
timeoutMillis);
@@ -146,7 +143,6 @@ public class MQAdminImpl {
throw new MQClientException("Unknow why, Can not find Message Queue
for this topic, " + topic, null);
}
-
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws
MQClientException {
try {
TopicRouteData topicRouteData =
this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic,
timeoutMillis);
@@ -160,14 +156,13 @@ public class MQAdminImpl {
}
} catch (Exception e) {
throw new MQClientException(
- "Can not find Message Queue for this topic, " + topic +
FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
- e);
+ "Can not find Message Queue for this topic, " + topic +
FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
+ e);
}
throw new MQClientException("Unknow why, Can not find Message Queue
for this topic, " + topic, null);
}
-
public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
String brokerAddr =
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -178,7 +173,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
return
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr,
mq.getTopic(), mq.getQueueId(), timestamp,
- timeoutMillis);
+ timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "]
exception", e);
}
@@ -187,7 +182,6 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "]
not exist", null);
}
-
public long maxOffset(MessageQueue mq) throws MQClientException {
String brokerAddr =
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -206,7 +200,6 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "]
not exist", null);
}
-
public long minOffset(MessageQueue mq) throws MQClientException {
String brokerAddr =
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -225,7 +218,6 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "]
not exist", null);
}
-
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException
{
String brokerAddr =
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -236,7 +228,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
return
this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr,
mq.getTopic(), mq.getQueueId(),
- timeoutMillis);
+ timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "]
exception", e);
}
@@ -254,18 +246,18 @@ public class MQAdminImpl {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query
message by id finished, but no message.");
}
return
this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
- messageId.getOffset(), timeoutMillis);
+ messageId.getOffset(), timeoutMillis);
}
public QueryResult queryMessage(String topic, String key, int maxNum, long
begin, long end) throws MQClientException,
- InterruptedException {
+ InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
- MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() -
1000, Long.MAX_VALUE, true);
+ MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() -
1000, Long.MAX_VALUE, true);
if (qr != null && qr.getMessageList() != null &&
qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
} else {
@@ -274,7 +266,7 @@ public class MQAdminImpl {
}
protected QueryResult queryMessage(String topic, String key, int maxNum,
long begin, long end, boolean isUniqKey) throws MQClientException,
- InterruptedException {
+ InterruptedException {
TopicRouteData topicRouteData =
this.mQClientFactory.getAnExistTopicRouteData(topic);
if (null == topicRouteData) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
@@ -304,43 +296,43 @@ public class MQAdminImpl {
requestHeader.setEndTimestamp(end);
this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader,
timeoutMillis * 3,
- new InvokeCallback() {
- @Override
- public void
operationComplete(ResponseFuture responseFuture) {
- try {
- RemotingCommand response =
responseFuture.getResponseCommand();
- if (response != null) {
- switch (response.getCode()) {
- case ResponseCode.SUCCESS:
{
-
QueryMessageResponseHeader responseHeader = null;
- try {
- responseHeader =
-
(QueryMessageResponseHeader) response
-
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
- } catch
(RemotingCommandException e) {
-
log.error("decodeCommandCustomHeader exception", e);
- return;
- }
-
- List<MessageExt>
wrappers =
-
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
- QueryResult qr = new
QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
-
queryResultList.add(qr);
- break;
+ new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture
responseFuture) {
+ try {
+ RemotingCommand response =
responseFuture.getResponseCommand();
+ if (response != null) {
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryMessageResponseHeader
responseHeader = null;
+ try {
+ responseHeader =
+
(QueryMessageResponseHeader)response
+
.decodeCommandCustomHeader(QueryMessageResponseHeader.class);
+ } catch
(RemotingCommandException e) {
+
log.error("decodeCommandCustomHeader exception", e);
+ return;
}
- default:
-
log.warn("getResponseCommand failed, {} {}", response.getCode(),
response.getRemark());
- break;
+
+ List<MessageExt> wrappers =
+
MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
+
+ QueryResult qr = new
QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
+ queryResultList.add(qr);
+ break;
}
- } else {
- log.warn("getResponseCommand
return null");
+ default:
+
log.warn("getResponseCommand failed, {} {}", response.getCode(),
response.getRemark());
+ break;
}
- } finally {
- countDownLatch.countDown();
+ } else {
+ log.warn("getResponseCommand
return null");
}
+ } finally {
+ countDownLatch.countDown();
}
- }, isUniqKey);
+ }
+ }, isUniqKey);
} catch (Exception e) {
log.warn("queryMessage exception", e);
}