Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
RongtongJin merged PR #8915: URL: https://github.com/apache/rocketmq/pull/8915 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
codecov-commenter commented on PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#issuecomment-2509594030 ## [Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/8915?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `61.83575%` with `79 lines` in your changes missing coverage. Please review. > Project coverage is 47.70%. Comparing base [(`2bbc852`)](https://app.codecov.io/gh/apache/rocketmq/commit/2bbc852361132db8697a5788197aa31f5e89d4a1?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`a20b1e7`)](https://app.codecov.io/gh/apache/rocketmq/commit/a20b1e7d0d738ac4deac3bba7f4bb476f51f02e6?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 29 commits behind head on develop. | [Files with missing lines](https://app.codecov.io/gh/apache/rocketmq/pull/8915?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...pache/rocketmq/broker/config/v2/ConfigStorage.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fbroker%2Fconfig%2Fv2%2FConfigStorage.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvY29uZmlnL3YyL0NvbmZpZ1N0b3JhZ2UuamF2YQ==) | 69.33% | [19 Missing and 4 partials :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...rocketmq/common/config/AbstractRocksDBStorage.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fcommon%2Fconfig%2FAbstractRocksDBStorage.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uZmlnL0Fic3RyYWN0Um9ja3NEQlN0b3JhZ2UuamF2YQ==) | 0.00% | [13 Missing :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...rocketmq/store/queue/RocksDBConsumeQueueStore.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=store%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fstore%2Fqueue%2FRocksDBConsumeQueueStore.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3F1ZXVlL1JvY2tzREJDb25zdW1lUXVldWVTdG9yZS5qYXZh) | 70.45% | [9 Missing and 4 partials :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...rg/apache/rocketmq/common/config/ConfigHelper.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=common%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fcommon%2Fconfig%2FConfigHelper.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uZmlnL0NvbmZpZ0hlbHBlci5qYXZh) | 0.00% | [8 Missing :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [.../rocketmq/store/queue/RocksGroupCommitService.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=store%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fstore%2Fqueue%2FRocksGroupCommitService.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3F1ZXVlL1JvY2tzR3JvdXBDb21taXRTZXJ2aWNlLmphdmE=) | 77.77% | [6 Missing and 2 partials :warning: ](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [...main/java/org/apache/rocketmq/store/CommitLog.java](https://app.codecov.io/gh/apache/rocketmq/pull/8915?src=pr&el=tree&filepath=store%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Frocketmq%2Fstore%2FCommitLog.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0NvbW1pdExvZy5qYXZh) | 14.28% | [4 Missin
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#issuecomment-2487261107 > Tests in error: RocksdbTransferOffsetAndCqTest.testRocksdbCqWrite:147 NullPointer Got a few pretty busy days, I would fix it when I got some bandwidth -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
RongtongJin commented on PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#issuecomment-2482303231 Tests in error: RocksdbTransferOffsetAndCqTest.testRocksdbCqWrite:147 NullPointer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842045934 ## common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java: ## @@ -121,14 +121,16 @@ protected void initWriteOptions() { this.writeOptions = new WriteOptions(); this.writeOptions.setSync(false); this.writeOptions.setDisableWAL(true); -this.writeOptions.setNoSlowdown(true); +// https://github.com/facebook/rocksdb/wiki/Write-Stalls +this.writeOptions.setNoSlowdown(false); } protected void initAbleWalWriteOptions() { this.ableWalWriteOptions = new WriteOptions(); this.ableWalWriteOptions.setSync(false); this.ableWalWriteOptions.setDisableWAL(false); -this.ableWalWriteOptions.setNoSlowdown(true); +// https://github.com/facebook/rocksdb/wiki/Write-Stalls +this.ableWalWriteOptions.setNoSlowdown(false); Review Comment: Checkout the RocksGroupCommitService, the group-commit thread is responsible for triggering and await batch-write to RocksDB; If the group-commit buffer[bounded, 100k, by default] is full, back-pressure the main dispatch thread, hence the dispatch-lag metrics alarms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842041548 ## store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.queue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.store.DispatchRequest; +import org.rocksdb.RocksDBException; + +public class RocksGroupCommitService extends ServiceThread { + +private static final int MAX_BUFFER_SIZE = 100_000; + +private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256; + +private final LinkedBlockingQueue buffer; + +private final RocksDBConsumeQueueStore store; + +private final List requests = new ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT); + +public RocksGroupCommitService(RocksDBConsumeQueueStore store) { +this.store = store; +this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE); +} + +@Override +public String getServiceName() { +return "RocksGroupCommit"; +} + +@Override +public void run() { +log.info("{} service started", this.getServiceName()); +while (!this.isStopped()) { +try { +this.waitForRunning(10); +this.doCommit(); +} catch (Exception e) { +log.warn("{} service has exception. ", this.getServiceName(), e); +} +} +log.info("{} service end", this.getServiceName()); +} + +public void putRequest(final DispatchRequest request) throws InterruptedException { +while (!buffer.offer(request, 3, TimeUnit.SECONDS)) { +log.warn("RocksGroupCommitService#buffer is full, 3s elapsed before space becomes available"); +} +this.wakeup(); +} + +private void doCommit() { +boolean interrupted = false; Review Comment: Good catch, will fix it in the next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842040726 ## broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java: ## @@ -105,6 +141,15 @@ public byte[] get(ByteBuffer key) throws RocksDBException { public void write(WriteBatch writeBatch) throws RocksDBException { db.write(ableWalWriteOptions, writeBatch); +accountWriteOpsForWalFlush(); +} + +private void accountWriteOpsForWalFlush() throws RocksDBException { +int writeCount = writeOpsCounter.incrementAndGet(); +if (writeCount >= messageStoreConfig.getRocksdbFlushWalFrequency()) { +this.db.flushWal(false); Review Comment: The frequency of commit-offset may be very high, I prefer to keep this periodic flush async. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842038737 ## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java: ## @@ -500,6 +529,16 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) return oldLogic != null ? oldLogic : newLogic; } +@Override +public ConcurrentMap findConsumeQueueMap(String topic) { +if (MixAll.isLmq(topic)) { +ConcurrentMap result = new ConcurrentHashMap<>(1); +result.put(MixAll.LMQ_QUEUE_ID, findOrCreateConsumeQueue(topic, MixAll.LMQ_QUEUE_ID)); +return result; +} +return super.findConsumeQueueMap(topic); +} Review Comment: This override SHOULD not be included in the pull request, as it makes sense internally only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842020934 ## store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.queue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.store.DispatchRequest; +import org.rocksdb.RocksDBException; + +public class RocksGroupCommitService extends ServiceThread { + +private static final int MAX_BUFFER_SIZE = 100_000; + +private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256; + +private final LinkedBlockingQueue buffer; + +private final RocksDBConsumeQueueStore store; + +private final List requests = new ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT); + +public RocksGroupCommitService(RocksDBConsumeQueueStore store) { +this.store = store; +this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE); +} + +@Override +public String getServiceName() { +return "RocksGroupCommit"; +} + +@Override +public void run() { +log.info("{} service started", this.getServiceName()); +while (!this.isStopped()) { +try { +this.waitForRunning(10); +this.doCommit(); +} catch (Exception e) { +log.warn("{} service has exception. ", this.getServiceName(), e); +} +} +log.info("{} service end", this.getServiceName()); +} + +public void putRequest(final DispatchRequest request) throws InterruptedException { +while (!buffer.offer(request, 3, TimeUnit.SECONDS)) { +log.warn("RocksGroupCommitService#buffer is full, 3s elapsed before space becomes available"); +} +this.wakeup(); +} + +private void doCommit() { +boolean interrupted = false; +while (!buffer.isEmpty() && !interrupted) { +DispatchRequest dispatchRequest = buffer.poll(); +if (null != dispatchRequest) { +requests.add(dispatchRequest); +} + +if (requests.isEmpty()) { +break; +} + +if (null == dispatchRequest || requests.size() >= PREFERRED_DISPATCH_REQUEST_COUNT) { +while (!store.isStopped()) { +try { +// putMessagePosition will clear requests after consume queue building completion +store.putMessagePosition(requests); +break; +} catch (RocksDBException e) { +log.error("Failed to build consume queue in RocksDB", e); +} Review Comment: Overall, yes, retry util it's 1) interrupted by SIGTERM; 2) failure recovered by internal restart; 3) blocked if RocksDB thread goes to state D; If RocksDB encounters an unrecoverable failure, for example, due to hardware failure, this thread will be blocked as the write opt prefers write-stall fast failure; if RocksDB experiences recoverable failures, sampling log error looks reasonable; Or, if you guys are suggesting something alternative, go ahead and comments are welcome -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
lizhanhui commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1842003505 ## store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java: ## @@ -144,10 +146,8 @@ public static DBOptions createDBOptions() { setInfoLogLevel(InfoLogLevel.INFO_LEVEL). setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery). setManualWalFlush(true). -setMaxTotalWalSize(0). Review Comment: ConsumeQueue uses AtomicFlush and WAL is disabled, this option does not make sense here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
fuyou001 commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841648217 ## common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java: ## @@ -121,14 +121,16 @@ protected void initWriteOptions() { this.writeOptions = new WriteOptions(); this.writeOptions.setSync(false); this.writeOptions.setDisableWAL(true); -this.writeOptions.setNoSlowdown(true); +// https://github.com/facebook/rocksdb/wiki/Write-Stalls +this.writeOptions.setNoSlowdown(false); } protected void initAbleWalWriteOptions() { this.ableWalWriteOptions = new WriteOptions(); this.ableWalWriteOptions.setSync(false); this.ableWalWriteOptions.setDisableWAL(false); -this.ableWalWriteOptions.setNoSlowdown(true); +// https://github.com/facebook/rocksdb/wiki/Write-Stalls +this.ableWalWriteOptions.setNoSlowdown(false); Review Comment: No fast failure, may be block ## store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.queue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.store.DispatchRequest; +import org.rocksdb.RocksDBException; + +public class RocksGroupCommitService extends ServiceThread { + +private static final int MAX_BUFFER_SIZE = 100_000; + +private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256; + +private final LinkedBlockingQueue buffer; + +private final RocksDBConsumeQueueStore store; + +private final List requests = new ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT); + +public RocksGroupCommitService(RocksDBConsumeQueueStore store) { +this.store = store; +this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE); +} + +@Override +public String getServiceName() { +return "RocksGroupCommit"; +} + +@Override +public void run() { +log.info("{} service started", this.getServiceName()); +while (!this.isStopped()) { +try { +this.waitForRunning(10); +this.doCommit(); +} catch (Exception e) { +log.warn("{} service has exception. ", this.getServiceName(), e); +} +} +log.info("{} service end", this.getServiceName()); +} + +public void putRequest(final DispatchRequest request) throws InterruptedException { +while (!buffer.offer(request, 3, TimeUnit.SECONDS)) { +log.warn("RocksGroupCommitService#buffer is full, 3s elapsed before space becomes available"); +} +this.wakeup(); +} + +private void doCommit() { +boolean interrupted = false; Review Comment: interrupted param no used ## broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java: ## @@ -105,6 +141,15 @@ public byte[] get(ByteBuffer key) throws RocksDBException { public void write(WriteBatch writeBatch) throws RocksDBException { db.write(ableWalWriteOptions, writeBatch); +accountWriteOpsForWalFlush(); +} + +private void accountWriteOpsForWalFlush() throws RocksDBException { +int writeCount = writeOpsCounter.incrementAndGet(); +if (writeCount >= messageStoreConfig.getRocksdbFlushWalFrequency()) { +this.db.flushWal(false); Review Comment: may be flushWal(true) better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
RongtongJin commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841497459 ## store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java: ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store.queue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.store.DispatchRequest; +import org.rocksdb.RocksDBException; + +public class RocksGroupCommitService extends ServiceThread { + +private static final int MAX_BUFFER_SIZE = 100_000; + +private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256; + +private final LinkedBlockingQueue buffer; + +private final RocksDBConsumeQueueStore store; + +private final List requests = new ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT); + +public RocksGroupCommitService(RocksDBConsumeQueueStore store) { +this.store = store; +this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE); +} + +@Override +public String getServiceName() { +return "RocksGroupCommit"; +} + +@Override +public void run() { +log.info("{} service started", this.getServiceName()); +while (!this.isStopped()) { +try { +this.waitForRunning(10); +this.doCommit(); +} catch (Exception e) { +log.warn("{} service has exception. ", this.getServiceName(), e); +} +} +log.info("{} service end", this.getServiceName()); +} + +public void putRequest(final DispatchRequest request) throws InterruptedException { +while (!buffer.offer(request, 3, TimeUnit.SECONDS)) { +log.warn("RocksGroupCommitService#buffer is full, 3s elapsed before space becomes available"); +} +this.wakeup(); +} + +private void doCommit() { +boolean interrupted = false; +while (!buffer.isEmpty() && !interrupted) { +DispatchRequest dispatchRequest = buffer.poll(); +if (null != dispatchRequest) { +requests.add(dispatchRequest); +} + +if (requests.isEmpty()) { +break; +} + +if (null == dispatchRequest || requests.size() >= PREFERRED_DISPATCH_REQUEST_COUNT) { +while (!store.isStopped()) { +try { +// putMessagePosition will clear requests after consume queue building completion +store.putMessagePosition(requests); +break; +} catch (RocksDBException e) { +log.error("Failed to build consume queue in RocksDB", e); +} Review Comment: If throwing a RocksDBException leads to an endless loop without any pause, is this expected behavior? ## store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java: ## @@ -500,6 +529,16 @@ public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) return oldLogic != null ? oldLogic : newLogic; } +@Override +public ConcurrentMap findConsumeQueueMap(String topic) { +if (MixAll.isLmq(topic)) { +ConcurrentMap result = new ConcurrentHashMap<>(1); +result.put(MixAll.LMQ_QUEUE_ID, findOrCreateConsumeQueue(topic, MixAll.LMQ_QUEUE_ID)); +return result; +} +return super.findConsumeQueueMap(topic); +} Review Comment: I don't quite understand the necessity of overriding this method. Could you please clarify? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
tianliuliu commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841472095 ## store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java: ## @@ -144,10 +146,8 @@ public static DBOptions createDBOptions() { setInfoLogLevel(InfoLogLevel.INFO_LEVEL). setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery). setManualWalFlush(true). -setMaxTotalWalSize(0). Review Comment: Here are multiple cfs using this db, Maybe you can set a MaxTotalWalSize to implement wal switch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix: multiple patches during long running tests for LMQ over RocksDB [rocketmq]
tianliuliu commented on code in PR #8915: URL: https://github.com/apache/rocketmq/pull/8915#discussion_r1841472095 ## store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java: ## @@ -144,10 +146,8 @@ public static DBOptions createDBOptions() { setInfoLogLevel(InfoLogLevel.INFO_LEVEL). setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery). setManualWalFlush(true). -setMaxTotalWalSize(0). Review Comment: 这里是多个cf使用这个db, 可以设置一个MaxTotalWalSize 来实现wal switch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org