jt2594838 commented on code in PR #14329:
URL: https://github.com/apache/iotdb/pull/14329#discussion_r1877048931
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java:
##########
@@ -334,6 +334,15 @@ public void loadProperties(TrimProperties properties)
throws BadNodeUrlException
.map(String::trim)
.orElse(Double.toString(conf.getRejectProportion())));
+ final double walBufferQueueProportion =
+ Double.parseDouble(
+ Optional.ofNullable(
+ properties.getProperty(
+ "wal_buffer_queue_proportion",
+ Double.toString(conf.getWalBufferQueueProportion())))
+ .map(String::trim)
+ .orElse(Double.toString(conf.getWalBufferQueueProportion())));
+
Review Comment:
No need to trim again when using TrimProperties.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WALEntryQueue {
+
+ private final BlockingQueue<WALEntry> queue;
+ private final Object lock = new Object();
Review Comment:
Maybe nonFullCondition is more precise?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WALEntryQueue {
+
+ private final BlockingQueue<WALEntry> queue;
+ private final Object lock = new Object();
+
+ public WALEntryQueue() {
+ queue = new LinkedBlockingQueue<>();
+ }
+
+ public WALEntry poll(long timeout, TimeUnit unit) throws
InterruptedException {
+ WALEntry e = queue.poll(timeout, unit);
+ if (e != null) {
+ SystemInfo.getInstance().updateWalQueueMemoryCost(-getElementSize(e));
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ return e;
+ }
+
+ public void put(WALEntry e) throws InterruptedException {
+ long elementSize = getElementSize(e);
+ synchronized (lock) {
+ while
(SystemInfo.getInstance().cannotReserveMemoryForWalEntry(elementSize)) {
+ lock.wait();
+ }
+ }
+ queue.put(e);
+ SystemInfo.getInstance().updateWalQueueMemoryCost(elementSize);
+ synchronized (lock) {
+ lock.notifyAll();
+ }
Review Comment:
Is this notification necessary? Adding a new element does not seem to
unblock any callers.
##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -742,6 +742,11 @@ buffered_arrays_memory_proportion=0.6
# Datatype: double
reject_proportion=0.8
+# Ratio of max memory size for the WAL buffer queue, 0.1 by default
+# effectiveMode: restart
Review Comment:
Ratio to what (whose 10%)? Should be more precise.
##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1402,7 +1407,7 @@ wal_sync_mode_fsync_delay_in_ms=3
# Datatype: int
wal_buffer_size_in_byte=33554432
-# Blocking queue capacity of each wal buffer, restricts maximum number of
WALEdits cached in the blocking queue.
+# Blocking queue capacity of each PageCacheDeletionBuffer, restricts maximum
number of entry in the blocking queue.
# effectiveMode: restart
# Datatype: int
wal_buffer_queue_capacity=500
Review Comment:
The config should be renamed.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryQueue.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.wal.utils;
+
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
+import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WALEntryQueue {
Review Comment:
Maybe we can abstract it as MemoryControlledBlockingQueue<T extends
Accountable> in the future, which may help in other places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]