Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-12-01 Thread via GitHub


RongtongJin merged PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-30 Thread via GitHub


codecov-commenter commented on PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#issuecomment-2509594030

   ## 
[Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/8915?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `61.83575%` with `79 lines` in your changes 
missing coverage. Please review.
   > Project coverage is 47.70%. Comparing base 
[(`2bbc852`)](https://app.codecov.io/gh/apache/rocketmq/commit/2bbc852361132db8697a5788197aa31f5e89d4a1?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`a20b1e7`)](https://app.codecov.io/gh/apache/rocketmq/commit/a20b1e7d0d738ac4deac3bba7f4bb476f51f02e6?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 29 commits behind head on develop.
   
   | [Files with missing 
lines](https://app.codecov.io/gh/apache/rocketmq/pull/8915?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Patch % | Lines |
   |---|---|---|
   | 
[...pache/rocketmq/broker/config/v2/ConfigStorage.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fbroker%2Fconfig%2Fv2%2FConfigStorage.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvY29uZmlnL3YyL0NvbmZpZ1N0b3JhZ2UuamF2YQ==)
 | 69.33% | [19 Missing and 4 partials :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...rocketmq/common/config/AbstractRocksDBStorage.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fcommon%2Fconfig%2FAbstractRocksDBStorage.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uZmlnL0Fic3RyYWN0Um9ja3NEQlN0b3JhZ2UuamF2YQ==)
 | 0.00% | [13 Missing :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...rocketmq/store/queue/RocksDBConsumeQueueStore.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=store%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fstore%2Fqueue%2FRocksDBConsumeQueueStore.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3F1ZXVlL1JvY2tzREJDb25zdW1lUXVldWVTdG9yZS5qYXZh)
 | 70.45% | [9 Missing and 4 partials :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...rg/apache/rocketmq/common/config/ConfigHelper.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fcommon%2Fconfig%2FConfigHelper.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uZmlnL0NvbmZpZ0hlbHBlci5qYXZh)
 | 0.00% | [8 Missing :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[.../rocketmq/store/queue/RocksGroupCommitService.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=store%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fstore%2Fqueue%2FRocksGroupCommitService.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3F1ZXVlL1JvY2tzR3JvdXBDb21taXRTZXJ2aWNlLmphdmE=)
 | 77.77% | [6 Missing and 2 partials :warning: 
](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 |
   | 
[...main/java/org/apache/rocketmq/store/CommitLog.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=store%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fstore%2FCommitLog.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0NvbW1pdExvZy5qYXZh)
 | 14.28% | [4 Missin

Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-19 Thread via GitHub


lizhanhui commented on PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#issuecomment-2487261107

   > Tests in error: RocksdbTransferOffsetAndCqTest.testRocksdbCqWrite:147 
NullPointer
   
   Got a few pretty busy days, I would fix it when I got some bandwidth


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-18 Thread via GitHub


RongtongJin commented on PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#issuecomment-2482303231

   Tests in error: 
 RocksdbTransferOffsetAndCqTest.testRocksdbCqWrite:147 NullPointer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-14 Thread via GitHub


lizhanhui commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842045934


##
common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java:
##
@@ -121,14 +121,16 @@ protected void initWriteOptions() {
 this.writeOptions = new WriteOptions();
 this.writeOptions.setSync(false);
 this.writeOptions.setDisableWAL(true);
-this.writeOptions.setNoSlowdown(true);
+// https://github.com/facebook/rocksdb/wiki/Write-Stalls
+this.writeOptions.setNoSlowdown(false);
 }
 
 protected void initAbleWalWriteOptions() {
 this.ableWalWriteOptions = new WriteOptions();
 this.ableWalWriteOptions.setSync(false);
 this.ableWalWriteOptions.setDisableWAL(false);
-this.ableWalWriteOptions.setNoSlowdown(true);
+// https://github.com/facebook/rocksdb/wiki/Write-Stalls
+this.ableWalWriteOptions.setNoSlowdown(false);

Review Comment:
   Checkout the RocksGroupCommitService,  the group-commit thread is 
responsible for triggering and await batch-write to RocksDB; If the 
group-commit buffer[bounded, 100k, by default] is full, back-pressure the main 
dispatch thread, hence the dispatch-lag metrics alarms.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-14 Thread via GitHub


lizhanhui commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842041548


##
store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.rocksdb.RocksDBException;
+
+public class RocksGroupCommitService extends ServiceThread {
+
+private static final int MAX_BUFFER_SIZE = 100_000;
+
+private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
+
+private final LinkedBlockingQueue buffer;
+
+private final RocksDBConsumeQueueStore store;
+
+private final List requests = new 
ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT);
+
+public RocksGroupCommitService(RocksDBConsumeQueueStore store) {
+this.store = store;
+this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);
+}
+
+@Override
+public String getServiceName() {
+return "RocksGroupCommit";
+}
+
+@Override
+public void run() {
+log.info("{} service started", this.getServiceName());
+while (!this.isStopped()) {
+try {
+this.waitForRunning(10);
+this.doCommit();
+} catch (Exception e) {
+log.warn("{} service has exception. ", this.getServiceName(), 
e);
+}
+}
+log.info("{} service end", this.getServiceName());
+}
+
+public void putRequest(final DispatchRequest request) throws 
InterruptedException {
+while (!buffer.offer(request, 3, TimeUnit.SECONDS)) {
+log.warn("RocksGroupCommitService#buffer is full, 3s elapsed 
before space becomes available");
+}
+this.wakeup();
+}
+
+private void doCommit() {
+boolean interrupted = false;

Review Comment:
   Good catch, will fix it in the next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-14 Thread via GitHub


lizhanhui commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842040726


##
broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java:
##
@@ -105,6 +141,15 @@ public byte[] get(ByteBuffer key) throws RocksDBException {
 
 public void write(WriteBatch writeBatch) throws RocksDBException {
 db.write(ableWalWriteOptions, writeBatch);
+accountWriteOpsForWalFlush();
+}
+
+private void accountWriteOpsForWalFlush() throws RocksDBException {
+int writeCount = writeOpsCounter.incrementAndGet();
+if (writeCount >= messageStoreConfig.getRocksdbFlushWalFrequency()) {
+this.db.flushWal(false);

Review Comment:
   The frequency of commit-offset may be very high, I prefer to keep this 
periodic flush async. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-14 Thread via GitHub


lizhanhui commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842038737


##
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##
@@ -500,6 +529,16 @@ public ConsumeQueueInterface 
findOrCreateConsumeQueue(String topic, int queueId)
 return oldLogic != null ? oldLogic : newLogic;
 }
 
+@Override
+public ConcurrentMap 
findConsumeQueueMap(String topic) {
+if (MixAll.isLmq(topic)) {
+ConcurrentMap result = new 
ConcurrentHashMap<>(1);
+result.put(MixAll.LMQ_QUEUE_ID, findOrCreateConsumeQueue(topic, 
MixAll.LMQ_QUEUE_ID));
+return result;
+}
+return super.findConsumeQueueMap(topic);
+}

Review Comment:
   This override SHOULD not be included in the pull request, as it makes sense 
internally only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-14 Thread via GitHub


lizhanhui commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842020934


##
store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.rocksdb.RocksDBException;
+
+public class RocksGroupCommitService extends ServiceThread {
+
+private static final int MAX_BUFFER_SIZE = 100_000;
+
+private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
+
+private final LinkedBlockingQueue buffer;
+
+private final RocksDBConsumeQueueStore store;
+
+private final List requests = new 
ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT);
+
+public RocksGroupCommitService(RocksDBConsumeQueueStore store) {
+this.store = store;
+this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);
+}
+
+@Override
+public String getServiceName() {
+return "RocksGroupCommit";
+}
+
+@Override
+public void run() {
+log.info("{} service started", this.getServiceName());
+while (!this.isStopped()) {
+try {
+this.waitForRunning(10);
+this.doCommit();
+} catch (Exception e) {
+log.warn("{} service has exception. ", this.getServiceName(), 
e);
+}
+}
+log.info("{} service end", this.getServiceName());
+}
+
+public void putRequest(final DispatchRequest request) throws 
InterruptedException {
+while (!buffer.offer(request, 3, TimeUnit.SECONDS)) {
+log.warn("RocksGroupCommitService#buffer is full, 3s elapsed 
before space becomes available");
+}
+this.wakeup();
+}
+
+private void doCommit() {
+boolean interrupted = false;
+while (!buffer.isEmpty() && !interrupted) {
+DispatchRequest dispatchRequest = buffer.poll();
+if (null != dispatchRequest) {
+requests.add(dispatchRequest);
+}
+
+if (requests.isEmpty()) {
+break;
+}
+
+if (null == dispatchRequest || requests.size() >= 
PREFERRED_DISPATCH_REQUEST_COUNT) {
+while (!store.isStopped()) {
+try {
+// putMessagePosition will clear requests after 
consume queue building completion
+store.putMessagePosition(requests);
+break;
+} catch (RocksDBException e) {
+log.error("Failed to build consume queue in RocksDB", 
e);
+}

Review Comment:
   Overall, yes, retry util it's 1) interrupted by SIGTERM; 2) failure 
recovered by internal restart; 3) blocked if RocksDB thread goes to state D;
   
   If RocksDB encounters an unrecoverable failure, for example, due to hardware 
failure, this thread will be blocked as the write opt prefers write-stall fast 
failure;
   if RocksDB experiences recoverable failures,  sampling log error looks 
reasonable; 
   
   Or, if you guys are suggesting something alternative, go ahead and comments 
are welcome



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-14 Thread via GitHub


lizhanhui commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842003505


##
store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java:
##
@@ -144,10 +146,8 @@ public static DBOptions createDBOptions() {
 setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
 setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
 setManualWalFlush(true).
-setMaxTotalWalSize(0).

Review Comment:
   ConsumeQueue uses AtomicFlush and WAL is disabled, this option does not make 
sense here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-13 Thread via GitHub


fuyou001 commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841648217


##
common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java:
##
@@ -121,14 +121,16 @@ protected void initWriteOptions() {
 this.writeOptions = new WriteOptions();
 this.writeOptions.setSync(false);
 this.writeOptions.setDisableWAL(true);
-this.writeOptions.setNoSlowdown(true);
+// https://github.com/facebook/rocksdb/wiki/Write-Stalls
+this.writeOptions.setNoSlowdown(false);
 }
 
 protected void initAbleWalWriteOptions() {
 this.ableWalWriteOptions = new WriteOptions();
 this.ableWalWriteOptions.setSync(false);
 this.ableWalWriteOptions.setDisableWAL(false);
-this.ableWalWriteOptions.setNoSlowdown(true);
+// https://github.com/facebook/rocksdb/wiki/Write-Stalls
+this.ableWalWriteOptions.setNoSlowdown(false);

Review Comment:
   No fast failure, may be block



##
store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.rocksdb.RocksDBException;
+
+public class RocksGroupCommitService extends ServiceThread {
+
+private static final int MAX_BUFFER_SIZE = 100_000;
+
+private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
+
+private final LinkedBlockingQueue buffer;
+
+private final RocksDBConsumeQueueStore store;
+
+private final List requests = new 
ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT);
+
+public RocksGroupCommitService(RocksDBConsumeQueueStore store) {
+this.store = store;
+this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);
+}
+
+@Override
+public String getServiceName() {
+return "RocksGroupCommit";
+}
+
+@Override
+public void run() {
+log.info("{} service started", this.getServiceName());
+while (!this.isStopped()) {
+try {
+this.waitForRunning(10);
+this.doCommit();
+} catch (Exception e) {
+log.warn("{} service has exception. ", this.getServiceName(), 
e);
+}
+}
+log.info("{} service end", this.getServiceName());
+}
+
+public void putRequest(final DispatchRequest request) throws 
InterruptedException {
+while (!buffer.offer(request, 3, TimeUnit.SECONDS)) {
+log.warn("RocksGroupCommitService#buffer is full, 3s elapsed 
before space becomes available");
+}
+this.wakeup();
+}
+
+private void doCommit() {
+boolean interrupted = false;

Review Comment:
   interrupted param no used



##
broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java:
##
@@ -105,6 +141,15 @@ public byte[] get(ByteBuffer key) throws RocksDBException {
 
 public void write(WriteBatch writeBatch) throws RocksDBException {
 db.write(ableWalWriteOptions, writeBatch);
+accountWriteOpsForWalFlush();
+}
+
+private void accountWriteOpsForWalFlush() throws RocksDBException {
+int writeCount = writeOpsCounter.incrementAndGet();
+if (writeCount >= messageStoreConfig.getRocksdbFlushWalFrequency()) {
+this.db.flushWal(false);

Review Comment:
   may be flushWal(true) better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-13 Thread via GitHub


RongtongJin commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841497459


##
store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.rocksdb.RocksDBException;
+
+public class RocksGroupCommitService extends ServiceThread {
+
+private static final int MAX_BUFFER_SIZE = 100_000;
+
+private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
+
+private final LinkedBlockingQueue buffer;
+
+private final RocksDBConsumeQueueStore store;
+
+private final List requests = new 
ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT);
+
+public RocksGroupCommitService(RocksDBConsumeQueueStore store) {
+this.store = store;
+this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);
+}
+
+@Override
+public String getServiceName() {
+return "RocksGroupCommit";
+}
+
+@Override
+public void run() {
+log.info("{} service started", this.getServiceName());
+while (!this.isStopped()) {
+try {
+this.waitForRunning(10);
+this.doCommit();
+} catch (Exception e) {
+log.warn("{} service has exception. ", this.getServiceName(), 
e);
+}
+}
+log.info("{} service end", this.getServiceName());
+}
+
+public void putRequest(final DispatchRequest request) throws 
InterruptedException {
+while (!buffer.offer(request, 3, TimeUnit.SECONDS)) {
+log.warn("RocksGroupCommitService#buffer is full, 3s elapsed 
before space becomes available");
+}
+this.wakeup();
+}
+
+private void doCommit() {
+boolean interrupted = false;
+while (!buffer.isEmpty() && !interrupted) {
+DispatchRequest dispatchRequest = buffer.poll();
+if (null != dispatchRequest) {
+requests.add(dispatchRequest);
+}
+
+if (requests.isEmpty()) {
+break;
+}
+
+if (null == dispatchRequest || requests.size() >= 
PREFERRED_DISPATCH_REQUEST_COUNT) {
+while (!store.isStopped()) {
+try {
+// putMessagePosition will clear requests after 
consume queue building completion
+store.putMessagePosition(requests);
+break;
+} catch (RocksDBException e) {
+log.error("Failed to build consume queue in RocksDB", 
e);
+}

Review Comment:
   If throwing a RocksDBException leads to an endless loop without any pause,  
is this expected behavior?



##
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##
@@ -500,6 +529,16 @@ public ConsumeQueueInterface 
findOrCreateConsumeQueue(String topic, int queueId)
 return oldLogic != null ? oldLogic : newLogic;
 }
 
+@Override
+public ConcurrentMap 
findConsumeQueueMap(String topic) {
+if (MixAll.isLmq(topic)) {
+ConcurrentMap result = new 
ConcurrentHashMap<>(1);
+result.put(MixAll.LMQ_QUEUE_ID, findOrCreateConsumeQueue(topic, 
MixAll.LMQ_QUEUE_ID));
+return result;
+}
+return super.findConsumeQueueMap(topic);
+}

Review Comment:
   I don't quite understand the necessity of overriding this method. Could you 
please clarify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-13 Thread via GitHub


tianliuliu commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841472095


##
store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java:
##
@@ -144,10 +146,8 @@ public static DBOptions createDBOptions() {
 setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
 setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
 setManualWalFlush(true).
-setMaxTotalWalSize(0).

Review Comment:
   Here are multiple cfs using this db, Maybe you can set a MaxTotalWalSize to 
implement wal switch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]

2024-11-13 Thread via GitHub


tianliuliu commented on code in PR #8915:
URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841472095


##
store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java:
##
@@ -144,10 +146,8 @@ public static DBOptions createDBOptions() {
 setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
 setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
 setManualWalFlush(true).
-setMaxTotalWalSize(0).

Review Comment:
   这里是多个cf使用这个db, 可以设置一个MaxTotalWalSize 来实现wal switch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org