http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java index da6401e..cf7cdcd 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java @@ -20,7 +20,6 @@ public class SubscriptionSysFlag { private final static int FLAG_UNIT = 0x1 << 0; - public static int buildSysFlag(final boolean unit) { int sysFlag = 0; @@ -31,22 +30,18 @@ public class SubscriptionSysFlag { return sysFlag; } - public static int setUnitFlag(final int sysFlag) { return sysFlag | FLAG_UNIT; } - public static int clearUnitFlag(final int sysFlag) { return sysFlag & (~FLAG_UNIT); } - public static boolean hasUnitFlag(final int sysFlag) { return (sysFlag & FLAG_UNIT) == FLAG_UNIT; } - public static void main(String[] args) { } }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java index 1d804db..2c45150 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java @@ -27,7 +27,6 @@ public class TopicSysFlag { private final static int FLAG_UNIT_SUB = 0x1 << 1; - public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) { int sysFlag = 0; @@ -42,37 +41,30 @@ public class TopicSysFlag { return sysFlag; } - public static int setUnitFlag(final int sysFlag) { return sysFlag | FLAG_UNIT; } - public static int clearUnitFlag(final int sysFlag) { return sysFlag & (~FLAG_UNIT); } - public static boolean hasUnitFlag(final int sysFlag) { return (sysFlag & FLAG_UNIT) == FLAG_UNIT; } - public static int setUnitSubFlag(final int sysFlag) { return sysFlag | FLAG_UNIT_SUB; } - public static int clearUnitSubFlag(final int sysFlag) { return sysFlag & (~FLAG_UNIT_SUB); } - public static boolean hasUnitSubFlag(final int sysFlag) { return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB; } - public static void main(String[] args) { } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java index ab017f2..dcb9187 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java @@ -6,25 +6,24 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common.utils; import io.netty.channel.Channel; - import java.net.InetAddress; import java.net.InetSocketAddress; public class ChannelUtil { public static String getRemoteIp(Channel channel) { - InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress(); + InetSocketAddress inetSocketAddress = (InetSocketAddress)channel.remoteAddress(); if (inetSocketAddress == null) { return ""; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java index fcd002c..0cc3463 100755 --- a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java @@ -6,20 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common.utils; -import org.apache.rocketmq.common.MQVersion; -import org.apache.rocketmq.common.MixAll; - import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; @@ -27,21 +24,22 @@ import java.net.URL; import java.net.URLEncoder; import java.util.Iterator; import java.util.List; - +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; public class HttpTinyClient { static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues, - String encoding, long readTimeoutMs) throws IOException { + String encoding, long readTimeoutMs) throws IOException { String encodedContent = encodingParams(paramValues, encoding); url += (null == encodedContent) ? "" : ("?" + encodedContent); HttpURLConnection conn = null; try { - conn = (HttpURLConnection) new URL(url).openConnection(); + conn = (HttpURLConnection)new URL(url).openConnection(); conn.setRequestMethod("GET"); - conn.setConnectTimeout((int) readTimeoutMs); - conn.setReadTimeout((int) readTimeoutMs); + conn.setConnectTimeout((int)readTimeoutMs); + conn.setReadTimeout((int)readTimeoutMs); setHeaders(conn, headers, encoding); conn.connect(); @@ -62,7 +60,7 @@ public class HttpTinyClient { } static private String encodingParams(List<String> paramValues, String encoding) - throws UnsupportedEncodingException { + throws UnsupportedEncodingException { StringBuilder sb = new StringBuilder(); if (null == paramValues) { return null; @@ -87,7 +85,6 @@ public class HttpTinyClient { conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding); - String ts = String.valueOf(System.currentTimeMillis()); conn.addRequestProperty("Metaq-Client-RequestTS", ts); } @@ -110,15 +107,15 @@ public class HttpTinyClient { * @throws java.io.IOException */ static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues, - String encoding, long readTimeoutMs) throws IOException { + String encoding, long readTimeoutMs) throws IOException { String encodedContent = encodingParams(paramValues, encoding); HttpURLConnection conn = null; try { - conn = (HttpURLConnection) new URL(url).openConnection(); + conn = (HttpURLConnection)new URL(url).openConnection(); conn.setRequestMethod("POST"); conn.setConnectTimeout(3000); - conn.setReadTimeout((int) readTimeoutMs); + conn.setReadTimeout((int)readTimeoutMs); conn.setDoOutput(true); conn.setDoInput(true); setHeaders(conn, headers, encoding); @@ -145,7 +142,6 @@ public class HttpTinyClient { final public int code; final public String content; - public HttpResult(int code, String content) { this.code = code; this.content = content; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java index a5152f8..b569c24 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java @@ -6,40 +6,46 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common.utils; -import org.apache.rocketmq.remoting.common.RemotingHelper; - -import java.io.*; +import java.io.BufferedReader; +import java.io.CharArrayWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.io.Writer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; - +import org.apache.rocketmq.remoting.common.RemotingHelper; public class IOTinyUtils { static public String toString(InputStream input, String encoding) throws IOException { return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader( - input, encoding)); + input, encoding)); } - static public String toString(Reader reader) throws IOException { CharArrayWriter sw = new CharArrayWriter(); copy(reader, sw); return sw.toString(); } - static public long copy(Reader input, Writer output) throws IOException { char[] buffer = new char[1 << 12]; long count = 0; @@ -50,7 +56,6 @@ public class IOTinyUtils { return count; } - /** */ @@ -58,7 +63,7 @@ public class IOTinyUtils { BufferedReader reader = toBufferedReader(input); List<String> list = new ArrayList<String>(); String line = null; - for (;;) { + for (; ; ) { line = reader.readLine(); if (null != line) { list.add(line); @@ -69,12 +74,10 @@ public class IOTinyUtils { return list; } - static private BufferedReader toBufferedReader(Reader reader) { - return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader); + return reader instanceof BufferedReader ? (BufferedReader)reader : new BufferedReader(reader); } - static public void copyFile(String source, String target) throws IOException { File sf = new File(source); if (!sf.exists()) { @@ -102,7 +105,6 @@ public class IOTinyUtils { } } - public static void delete(File fileOrDir) throws IOException { if (fileOrDir == null) { return; @@ -115,7 +117,6 @@ public class IOTinyUtils { fileOrDir.delete(); } - /** */ @@ -149,7 +150,6 @@ public class IOTinyUtils { } } - public static void writeStringToFile(File file, String data, String encoding) throws IOException { OutputStream os = null; try { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java index 0006f74..3205c64 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java @@ -6,23 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; -import junit.framework.Assert; -import org.junit.Test; - import java.net.InetAddress; import java.util.List; - +import junit.framework.Assert; +import org.junit.Test; public class MixAllTest { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java index b7509b1..9211d37 100644 --- a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java @@ -6,20 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.junit.Test; - public class RemotingUtilTest { @Test public void test() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java index decd3d0..b21d65b 100644 --- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java @@ -6,25 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; -import org.junit.Test; - import java.net.URL; import java.util.Properties; +import org.junit.Test; import static org.junit.Assert.assertTrue; - public class UtilAllTest { @Test @@ -32,7 +30,6 @@ public class UtilAllTest { System.out.println(UtilAll.currentStackTrace()); } - @Test public void test_a() { URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation(); @@ -40,14 +37,12 @@ public class UtilAllTest { System.out.println(url.getPath()); } - @Test public void test_resetClassProperties() { DemoConfig demoConfig = new DemoConfig(); MixAll.properties2Object(new Properties(), demoConfig); } - @Test public void test_properties2String() { DemoConfig demoConfig = new DemoConfig(); @@ -55,13 +50,11 @@ public class UtilAllTest { System.out.println(MixAll.properties2String(properties)); } - @Test public void test_timeMillisToHumanString() { System.out.println(UtilAll.timeMillisToHumanString()); } - @Test public void test_isPropertiesEqual() { final Properties p1 = new Properties(); @@ -77,7 +70,6 @@ public class UtilAllTest { assertTrue(MixAll.isPropertiesEqual(p1, p2)); } - @Test public void test_getpid() { int pid = UtilAll.getPid(); @@ -86,7 +78,6 @@ public class UtilAllTest { assertTrue(pid > 0); } - @Test public void test_isBlank() { { @@ -121,42 +112,34 @@ public class UtilAllTest { private boolean demoOK = false; private String demoName = "haha"; - public int getDemoWidth() { return demoWidth; } - public void setDemoWidth(int demoWidth) { this.demoWidth = demoWidth; } - public int getDemoLength() { return demoLength; } - public void setDemoLength(int demoLength) { this.demoLength = demoLength; } - public boolean isDemoOK() { return demoOK; } - public void setDemoOK(boolean demoOK) { this.demoOK = demoOK; } - public String getDemoName() { return demoName; } - public void setDemoNfieldame(String demoName) { this.demoName = demoName; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java index cf26efd..5a97db9 100644 --- a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java +++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common.filter; @@ -21,7 +21,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.junit.Test; - /** * */ @@ -30,14 +29,14 @@ public class FilterAPITest { @Test public void testBuildSubscriptionData() throws Exception { SubscriptionData subscriptionData = - FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3"); + FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3"); System.out.println(subscriptionData); } @Test public void testSubscriptionData() throws Exception { SubscriptionData subscriptionData = - FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3"); + FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3"); subscriptionData.setFilterClassSource("java hello"); String json = RemotingSerializable.toJson(subscriptionData, true); System.out.println(json); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java index 79c6bbf..b511537 100644 --- a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common.protocol; @@ -21,7 +21,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.junit.Test; - public class ConsumeStatusTest { @Test http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-a-s.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-async/broker-a-s.properties b/conf/2m-2s-async/broker-a-s.properties index a4401f8..60fddf9 100644 --- a/conf/2m-2s-async/broker-a-s.properties +++ b/conf/2m-2s-async/broker-a-s.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-a brokerId=1 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-a.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-async/broker-a.properties b/conf/2m-2s-async/broker-a.properties index 6ca12f1..367f974 100644 --- a/conf/2m-2s-async/broker-a.properties +++ b/conf/2m-2s-async/broker-a.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-b-s.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-async/broker-b-s.properties b/conf/2m-2s-async/broker-b-s.properties index 51f8daf..bcd5a16 100644 --- a/conf/2m-2s-async/broker-b-s.properties +++ b/conf/2m-2s-async/broker-b-s.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-b brokerId=1 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-async/broker-b.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-async/broker-b.properties b/conf/2m-2s-async/broker-b.properties index f7f3791..33b68fe 100644 --- a/conf/2m-2s-async/broker-b.properties +++ b/conf/2m-2s-async/broker-b.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-a-s.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-sync/broker-a-s.properties b/conf/2m-2s-sync/broker-a-s.properties index a4401f8..60fddf9 100644 --- a/conf/2m-2s-sync/broker-a-s.properties +++ b/conf/2m-2s-sync/broker-a-s.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-a brokerId=1 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-a.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-sync/broker-a.properties b/conf/2m-2s-sync/broker-a.properties index 135552d..b916f88 100644 --- a/conf/2m-2s-sync/broker-a.properties +++ b/conf/2m-2s-sync/broker-a.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-b-s.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-sync/broker-b-s.properties b/conf/2m-2s-sync/broker-b-s.properties index 51f8daf..bcd5a16 100644 --- a/conf/2m-2s-sync/broker-b-s.properties +++ b/conf/2m-2s-sync/broker-b-s.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-b brokerId=1 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-2s-sync/broker-b.properties ---------------------------------------------------------------------- diff --git a/conf/2m-2s-sync/broker-b.properties b/conf/2m-2s-sync/broker-b.properties index 97162a7..44fcea7 100644 --- a/conf/2m-2s-sync/broker-b.properties +++ b/conf/2m-2s-sync/broker-b.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-noslave/broker-a.properties ---------------------------------------------------------------------- diff --git a/conf/2m-noslave/broker-a.properties b/conf/2m-noslave/broker-a.properties index 6ca12f1..367f974 100644 --- a/conf/2m-noslave/broker-a.properties +++ b/conf/2m-noslave/broker-a.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/2m-noslave/broker-b.properties ---------------------------------------------------------------------- diff --git a/conf/2m-noslave/broker-b.properties b/conf/2m-noslave/broker-b.properties index f7f3791..33b68fe 100644 --- a/conf/2m-noslave/broker-b.properties +++ b/conf/2m-noslave/broker-b.properties @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - brokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/broker.conf ---------------------------------------------------------------------- diff --git a/conf/broker.conf b/conf/broker.conf index 6ca12f1..0c0b28b 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -brokerClusterName=DefaultCluster -brokerName=broker-a -brokerId=0 -deleteWhen=04 -fileReservedTime=48 -brokerRole=ASYNC_MASTER -flushDiskType=ASYNC_FLUSH +brokerClusterName = DefaultCluster +brokerName = broker-a +brokerId = 0 +deleteWhen = 04 +fileReservedTime = 48 +brokerRole = ASYNC_MASTER +flushDiskType = ASYNC_FLUSH http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_broker.xml ---------------------------------------------------------------------- diff --git a/conf/logback_broker.xml b/conf/logback_broker.xml index 49e9d12..7a8f83b 100644 --- a/conf/logback_broker.xml +++ b/conf/logback_broker.xml @@ -28,7 +28,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -48,7 +48,7 @@ <maxIndex>20</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>128MB</maxFileSize> </triggeringPolicy> <encoder> @@ -71,7 +71,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -94,7 +94,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -117,7 +117,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>128MB</maxFileSize> </triggeringPolicy> <encoder> @@ -140,7 +140,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -163,7 +163,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -187,7 +187,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -210,7 +210,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -233,7 +233,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -253,7 +253,7 @@ <maxIndex>10</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>500MB</maxFileSize> </triggeringPolicy> </appender> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_filtersrv.xml ---------------------------------------------------------------------- diff --git a/conf/logback_filtersrv.xml b/conf/logback_filtersrv.xml index 8de4e08..9668795 100644 --- a/conf/logback_filtersrv.xml +++ b/conf/logback_filtersrv.xml @@ -28,7 +28,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -48,7 +48,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_namesrv.xml ---------------------------------------------------------------------- diff --git a/conf/logback_namesrv.xml b/conf/logback_namesrv.xml index 7a60c76..45ccf4f 100644 --- a/conf/logback_namesrv.xml +++ b/conf/logback_namesrv.xml @@ -28,7 +28,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -48,7 +48,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/conf/logback_tools.xml ---------------------------------------------------------------------- diff --git a/conf/logback_tools.xml b/conf/logback_tools.xml index addf211..35d33a5 100644 --- a/conf/logback_tools.xml +++ b/conf/logback_tools.xml @@ -28,7 +28,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> @@ -48,7 +48,7 @@ <maxIndex>5</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/pom.xml ---------------------------------------------------------------------- diff --git a/example/pom.xml b/example/pom.xml index 53aa6a6..efb1aa5 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 1fbb8a4..f810f5a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -17,6 +17,15 @@ package org.apache.rocketmq.example.benchmark; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -24,16 +33,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; - -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicLong; public class Consumer { @@ -77,17 +76,16 @@ public class Consumer { Long[] end = snapshotList.getLast(); final long consumeTps = - (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); - final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); - final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); + (long)(((end[1] - begin[1]) / (double)(end[0] - begin[0])) * 1000L); + final double averageB2CRT = (end[2] - begin[2]) / (double)(end[1] - begin[1]); + final double averageS2CRT = (end[3] - begin[3]) / (double)(end[1] - begin[1]); System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", - consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] + consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] ); } } - @Override public void run() { try { @@ -106,7 +104,7 @@ public class Consumer { consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); long now = System.currentTimeMillis(); @@ -140,7 +138,6 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); - opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false"); opt.setRequired(false); options.addOption(opt); @@ -148,7 +145,6 @@ public class Consumer { return options; } - public static void compareAndSetMax(final AtomicLong target, final long value) { long prev = target.get(); while (value > prev) { @@ -161,7 +157,6 @@ public class Consumer { } } - class StatsBenchmarkConsumer { private final AtomicLong receiveMessageTotalCount = new AtomicLong(0L); @@ -173,41 +168,35 @@ class StatsBenchmarkConsumer { private final AtomicLong store2ConsumerMaxRT = new AtomicLong(0L); - public Long[] createSnapshot() { - Long[] snap = new Long[]{ - System.currentTimeMillis(), - this.receiveMessageTotalCount.get(), - this.born2ConsumerTotalRT.get(), - this.store2ConsumerTotalRT.get(), - this.born2ConsumerMaxRT.get(), - this.store2ConsumerMaxRT.get(), + Long[] snap = new Long[] { + System.currentTimeMillis(), + this.receiveMessageTotalCount.get(), + this.born2ConsumerTotalRT.get(), + this.store2ConsumerTotalRT.get(), + this.born2ConsumerMaxRT.get(), + this.store2ConsumerMaxRT.get(), }; return snap; } - public AtomicLong getReceiveMessageTotalCount() { return receiveMessageTotalCount; } - public AtomicLong getBorn2ConsumerTotalRT() { return born2ConsumerTotalRT; } - public AtomicLong getStore2ConsumerTotalRT() { return store2ConsumerTotalRT; } - public AtomicLong getBorn2ConsumerMaxRT() { return born2ConsumerMaxRT; } - public AtomicLong getStore2ConsumerMaxRT() { return store2ConsumerMaxRT; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 3b13f94..88e9a4f 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -16,6 +16,17 @@ */ package org.apache.rocketmq.example.benchmark; +import java.io.UnsupportedEncodingException; +import java.util.LinkedList; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; @@ -24,20 +35,8 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; import org.slf4j.Logger; -import java.io.UnsupportedEncodingException; -import java.util.LinkedList; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; - public class Producer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { @@ -82,15 +81,14 @@ public class Producer { Long[] begin = snapshotList.getFirst(); Long[] end = snapshotList.getLast(); - final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + final long sendTps = (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); } } - @Override public void run() { try { @@ -202,7 +200,6 @@ public class Producer { } } - class StatsBenchmarkProducer { private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); @@ -216,46 +213,39 @@ class StatsBenchmarkProducer { private final AtomicLong sendMessageMaxRT = new AtomicLong(0L); - public Long[] createSnapshot() { - Long[] snap = new Long[]{ - System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.receiveResponseSuccessCount.get(), - this.receiveResponseFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), + Long[] snap = new Long[] { + System.currentTimeMillis(), + this.sendRequestSuccessCount.get(), + this.sendRequestFailedCount.get(), + this.receiveResponseSuccessCount.get(), + this.receiveResponseFailedCount.get(), + this.sendMessageSuccessTimeTotal.get(), }; return snap; } - public AtomicLong getSendRequestSuccessCount() { return sendRequestSuccessCount; } - public AtomicLong getSendRequestFailedCount() { return sendRequestFailedCount; } - public AtomicLong getReceiveResponseSuccessCount() { return receiveResponseSuccessCount; } - public AtomicLong getReceiveResponseFailedCount() { return receiveResponseFailedCount; } - public AtomicLong getSendMessageSuccessTimeTotal() { return sendMessageSuccessTimeTotal; } - public AtomicLong getSendMessageMaxRT() { return sendMessageMaxRT; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 43f159b..ce4b1ab 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -17,12 +17,6 @@ package org.apache.rocketmq.example.benchmark; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.client.producer.*; - import java.io.UnsupportedEncodingException; import java.util.LinkedList; import java.util.Timer; @@ -30,6 +24,15 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.RemotingHelper; public class TransactionProducer { private static int threadCount; @@ -37,7 +40,6 @@ public class TransactionProducer { private static boolean ischeck; private static boolean ischeckffalse; - public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; @@ -71,16 +73,15 @@ public class TransactionProducer { Long[] end = snapshotList.getLast(); final long sendTps = - (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); System.out.printf( - "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", - sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); + "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4], end[6]); } } - @Override public void run() { try { @@ -92,7 +93,7 @@ public class TransactionProducer { }, 10000, 10000); final TransactionCheckListener transactionCheckListener = - new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); + new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setTransactionCheckListener(transactionCheckListener); @@ -110,7 +111,7 @@ public class TransactionProducer { // Thread.sleep(1000); final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = - producer.sendMessageInTransaction(msg, tranExecuter, null); + producer.sendMessageInTransaction(msg, tranExecuter, null); if (sendResult != null) { statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); @@ -121,8 +122,8 @@ public class TransactionProducer { long prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); while (currentRT > prevMaxRT) { boolean updated = - statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, - currentRT); + statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, + currentRT); if (updated) break; @@ -137,7 +138,6 @@ public class TransactionProducer { } } - private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { Message msg = new Message(); msg.setTopic("BenchmarkTest"); @@ -153,17 +153,14 @@ public class TransactionProducer { } } - class TransactionExecuterBImpl implements LocalTransactionExecuter { private boolean ischeck; - public TransactionExecuterBImpl(boolean ischeck) { this.ischeck = ischeck; } - @Override public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { if (ischeck) { @@ -173,19 +170,16 @@ class TransactionExecuterBImpl implements LocalTransactionExecuter { } } - class TransactionCheckListenerBImpl implements TransactionCheckListener { private boolean ischeckffalse; private StatsBenchmarkTProducer statsBenchmarkTProducer; - public TransactionCheckListenerBImpl(boolean ischeckffalse, - StatsBenchmarkTProducer statsBenchmarkTProducer) { + StatsBenchmarkTProducer statsBenchmarkTProducer) { this.ischeckffalse = ischeckffalse; this.statsBenchmarkTProducer = statsBenchmarkTProducer; } - @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); @@ -198,7 +192,6 @@ class TransactionCheckListenerBImpl implements TransactionCheckListener { } } - class StatsBenchmarkTProducer { private final AtomicLong sendRequestSuccessCount = new AtomicLong(0L); @@ -214,51 +207,43 @@ class StatsBenchmarkTProducer { private final AtomicLong checkRequestSuccessCount = new AtomicLong(0L); - public Long[] createSnapshot() { - Long[] snap = new Long[]{ - System.currentTimeMillis(), - this.sendRequestSuccessCount.get(), - this.sendRequestFailedCount.get(), - this.receiveResponseSuccessCount.get(), - this.receiveResponseFailedCount.get(), - this.sendMessageSuccessTimeTotal.get(), - this.checkRequestSuccessCount.get()}; + Long[] snap = new Long[] { + System.currentTimeMillis(), + this.sendRequestSuccessCount.get(), + this.sendRequestFailedCount.get(), + this.receiveResponseSuccessCount.get(), + this.receiveResponseFailedCount.get(), + this.sendMessageSuccessTimeTotal.get(), + this.checkRequestSuccessCount.get()}; return snap; } - public AtomicLong getSendRequestSuccessCount() { return sendRequestSuccessCount; } - public AtomicLong getSendRequestFailedCount() { return sendRequestFailedCount; } - public AtomicLong getReceiveResponseSuccessCount() { return receiveResponseSuccessCount; } - public AtomicLong getReceiveResponseFailedCount() { return receiveResponseFailedCount; } - public AtomicLong getSendMessageSuccessTimeTotal() { return sendMessageSuccessTimeTotal; } - public AtomicLong getSendMessageMaxRT() { return sendMessageMaxRT; } - public AtomicLong getCheckRequestSuccessCount() { return checkRequestSuccessCount; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java index aa62a1e..6301b3b 100644 --- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.broadcast; +import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -25,8 +26,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; - public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { @@ -42,7 +41,7 @@ public class PushConsumer { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java index d0a41f1..8d0fbe4 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.filter; +import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -24,9 +25,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - - public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { @@ -34,13 +32,13 @@ public class Consumer { String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); consumer.subscribe("TopicFilter7", "org.apache.rocketmq.example.filter.MessageFilterImpl", - filterCode); + filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java index d58c28d..a2dba6c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java @@ -30,9 +30,9 @@ public class Producer { try { for (int i = 0; i < 6000000; i++) { Message msg = new Message("TopicFilter7", - "TagA", - "OrderID001", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + "TagA", + "OrderID001", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.putUserProperty("SequenceId", String.valueOf(i)); SendResult sendResult = producer.send(msg); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java index a6a3aca..ec7d6ef 100644 --- a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java @@ -16,22 +16,20 @@ */ package org.apache.rocketmq.example.operation; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; public class Consumer { @@ -51,10 +49,9 @@ public class Consumer { consumer.registerMessageListener(new MessageListenerConcurrently() { AtomicLong consumeTimes = new AtomicLong(0); - @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { long currentTimes = this.consumeTimes.incrementAndGet(); System.out.printf("%-8d %s%n", currentTimes, msgs); if (Boolean.parseBoolean(returnFailedHalf)) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java index 54e256b..663acd0 100644 --- a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java @@ -6,22 +6,27 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.example.operation; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.commons.cli.*; public class Producer { @@ -42,10 +47,10 @@ public class Producer { for (int i = 0; i < Integer.parseInt(msgCount); i++) { try { Message msg = new Message( - topic, - tags, - keys, - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + topic, + tags, + keys, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%-8d %s%n", i, sendResult); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java index 7ddfbf7..0a25402 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.example.ordermessage; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; @@ -24,10 +26,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - - public class Consumer { public static void main(String[] args) throws MQClientException { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java index 84c1da4..7abbb5a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.example.ordermessage; +import java.io.UnsupportedEncodingException; +import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -27,25 +29,22 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; -import java.io.UnsupportedEncodingException; -import java.util.List; - public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); - String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; + String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; Message msg = - new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - Integer id = (Integer) arg; + Integer id = (Integer)arg; int index = id % mqs.size(); return mqs.get(index); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index 43566f0..513c269 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.quickstart; +import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -24,8 +25,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { @@ -39,7 +38,7 @@ public class Consumer { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, - ConsumeConcurrentlyContext context) { + ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index f6bd5df..a74d9df 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -32,8 +32,8 @@ public class Producer { for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest", - "TagA", - ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) + "TagA", + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java index 68dbb67..d4d9975 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.simple; +import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; @@ -23,9 +24,6 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; -import java.io.UnsupportedEncodingException; - - public class AsyncProducer { public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { @@ -37,9 +35,9 @@ public class AsyncProducer { try { final int index = i; Message msg = new Message("Jodie_topic_1023", - "TagA", - "OrderID188", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java index 2b4ce23..54bf54f 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/CachedQueue.java @@ -6,26 +6,23 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.example.simple; -import org.apache.rocketmq.common.message.MessageExt; - import java.util.TreeMap; - +import org.apache.rocketmq.common.message.MessageExt; public class CachedQueue { private final TreeMap<Long, MessageExt> msgCachedTable = new TreeMap<Long, MessageExt>(); - public TreeMap<Long, MessageExt> getMsgCachedTable() { return msgCachedTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index b035d57..590bcee 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -22,7 +22,6 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; - public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { @@ -34,9 +33,9 @@ public class Producer { try { { Message msg = new Message("TopicTest", - "TagA", - "OrderID188", - "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java index 8c9ba15..c468f3a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -16,19 +16,17 @@ */ package org.apache.rocketmq.example.simple; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - public class PullConsumer { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); - public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); @@ -41,7 +39,7 @@ public class PullConsumer { while (true) { try { PullResult pullResult = - consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); + consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java index d38d679..b6bc8d2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; - public class PullScheduleService { public static void main(String[] args) throws MQClientException { @@ -59,7 +58,6 @@ public class PullScheduleService { } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); - context.setPullNextDelayTimeMillis(100); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java index 5929aff..78bb922 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.simple; +import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -24,9 +25,6 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - - public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException {