rpuch commented on a change in pull request #566: URL: https://github.com/apache/ignite-3/pull/566#discussion_r789593912
########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { + /** No-op page handler. */ + public static final PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg, statHolder) -> Boolean.TRUE; + + /** + * Handles the page. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. Review comment: Would it be possible to create a glossary of what page pointer, page ID and page address are and how they are different (and so on) and put it somewhere (like package javadoc)? Then it would be handy to set use the 'see' tag to mention this glossary everywhere where these terms are used (at least, on the class level). ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { Review comment: It seems that this type has at least 2 things combined in it: first, it's the 'handle' itself (some action that might be done with a page); second (all, or almost all) of the static methods here are the 'things that apply that action in certain contexts'. I suggest to separate them. Handler part (one-method interface) here, 'action executors' there. A good name needs to be invented for the second part, but I cannot come up with anything good yet... ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/reuse/ReuseList.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.reuse; + +import org.apache.ignite.internal.pagememory.FullPageId; +import org.apache.ignite.internal.pagememory.PageIdAllocator; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.lang.IgniteInternalCheckedException; +import org.jetbrains.annotations.Nullable; + +/** + * Reuse list. + */ +public interface ReuseList { + /** + * Polls all pages from reuse bag and puts them into reuse list. + * + * @param bag Reuse bag. + * @throws IgniteInternalCheckedException If failed. + */ + void addForRecycle(ReuseBag bag) throws IgniteInternalCheckedException; Review comment: The method name and javadoc description describe different things (probably because they are describing the same action, but from the point of view of different levels, like intention and implementation?). Would it make sense to reconsile them? For example, rename the method to `consumeFully()`, or (if `addForRecycle()` is the most adequate name possible) add this 'add-for-recycle' logic description to the javadoc (in the beginning), and only then put the part about polling all pages? ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { + /** No-op page handler. */ + public static final PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg, statHolder) -> Boolean.TRUE; + + /** + * Handles the page. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param io IO. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteInternalCheckedException If failed. + */ + public R run( + int groupId, + long pageId, + long page, + long pageAddr, + PageIo io, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException; + + /** + * Checks whether write lock (and acquiring if applicable) should be released after handling. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return {@code true} If release. + */ + default boolean releaseAfterWrite( + int groupId, + long pageId, + long page, + long pageAddr, + X arg, + int intArg + ) { + return true; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( Review comment: Static methods are ok when they are short and 'contextless'. Here, they contain a ton of logic that is core to Ignite. This makes it extremely difficult (almost impossible) to apply unit testing to this code. My suggestion is to make these methods non-static, maybe even turn the class (of handler executors) to interface. This would allow to mock them; inject dependencies and use this property to pass less parameters in invocations; probably something else. ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { + /** No-op page handler. */ + public static final PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg, statHolder) -> Boolean.TRUE; + + /** + * Handles the page. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param io IO. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteInternalCheckedException If failed. + */ + public R run( + int groupId, + long pageId, + long page, + long pageAddr, + PageIo io, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException; + + /** + * Checks whether write lock (and acquiring if applicable) should be released after handling. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return {@code true} If release. + */ + default boolean releaseAfterWrite( + int groupId, + long pageId, + long page, + long pageAddr, + X arg, + int intArg + ) { + return true; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long page = pageMem.acquirePage(groupId, pageId, statHolder); + + try { + return readPage(pageMem, groupId, pageId, page, lsnr, h, arg, intArg, lockFailed, statHolder); + } finally { + pageMem.releasePage(groupId, pageId, page); + } + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Page must already be acquired. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long pageAddr = 0L; + + try { + if ((pageAddr = readLock(pageMem, groupId, pageId, page, lsnr)) == 0L) { + return lockFailed; + } + + PageIo io = pageMem.ioRegistry().resolve(pageAddr); + + return h.run(groupId, pageId, page, pageAddr, io, arg, intArg, statHolder); + } finally { + if (pageAddr != 0L) { + readUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr); + } + } + } + + /** + * Acquires the read lock on the page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @return Page address or {@code 0} if acquiring failed. + */ + public static long readLock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr + ) { + lsnr.onBeforeReadLock(groupId, pageId, page); + + long pageAddr = pageMem.readLock(groupId, pageId, page); + + lsnr.onReadLock(groupId, pageId, page, pageAddr); + + return pageAddr; + } + + /** + * Releases acquired read lock. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param lsnr Lock listener. + */ + public static void readUnlock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr + ) { + lsnr.onReadUnlock(groupId, pageId, page, pageAddr); + + pageMem.readUnlock(groupId, pageId, page); + } + + /** + * Initializes a new page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param init IO for new page initialization. + * @param lsnr Lock listener. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteInternalCheckedException If failed. + * @see PageIo#initNewPage(long, long, int) + */ + public static void initPage( + PageMemory pageMem, + int groupId, + long pageId, + PageIo init, + PageLockListener lsnr, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + Boolean res = writePage( + pageMem, + groupId, + pageId, + lsnr, + PageHandler.NO_OP, + init, + null, + 0, + FALSE, + statHolder + ); + + assert res != FALSE; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Review comment: In the code, `writeLock()` is taken, should it be 'write lock' in the javadoc? ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { + /** No-op page handler. */ + public static final PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg, statHolder) -> Boolean.TRUE; + + /** + * Handles the page. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param io IO. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteInternalCheckedException If failed. + */ + public R run( + int groupId, + long pageId, + long page, + long pageAddr, + PageIo io, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException; + + /** + * Checks whether write lock (and acquiring if applicable) should be released after handling. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return {@code true} If release. + */ + default boolean releaseAfterWrite( + int groupId, + long pageId, + long page, + long pageAddr, + X arg, + int intArg + ) { + return true; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long page = pageMem.acquirePage(groupId, pageId, statHolder); + + try { + return readPage(pageMem, groupId, pageId, page, lsnr, h, arg, intArg, lockFailed, statHolder); + } finally { + pageMem.releasePage(groupId, pageId, page); + } + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Page must already be acquired. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long pageAddr = 0L; + + try { + if ((pageAddr = readLock(pageMem, groupId, pageId, page, lsnr)) == 0L) { + return lockFailed; + } + + PageIo io = pageMem.ioRegistry().resolve(pageAddr); + + return h.run(groupId, pageId, page, pageAddr, io, arg, intArg, statHolder); + } finally { + if (pageAddr != 0L) { + readUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr); + } + } + } + + /** + * Acquires the read lock on the page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @return Page address or {@code 0} if acquiring failed. + */ + public static long readLock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr + ) { + lsnr.onBeforeReadLock(groupId, pageId, page); + + long pageAddr = pageMem.readLock(groupId, pageId, page); + + lsnr.onReadLock(groupId, pageId, page, pageAddr); + + return pageAddr; + } + + /** + * Releases acquired read lock. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param lsnr Lock listener. + */ + public static void readUnlock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr + ) { + lsnr.onReadUnlock(groupId, pageId, page, pageAddr); + + pageMem.readUnlock(groupId, pageId, page); + } + + /** + * Initializes a new page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param init IO for new page initialization. + * @param lsnr Lock listener. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteInternalCheckedException If failed. + * @see PageIo#initNewPage(long, long, int) + */ + public static void initPage( + PageMemory pageMem, + int groupId, + long pageId, + PageIo init, + PageLockListener lsnr, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + Boolean res = writePage( + pageMem, + groupId, + pageId, + lsnr, + PageHandler.NO_OP, + init, + null, + 0, + FALSE, + statHolder + ); + + assert res != FALSE; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R writePage( + PageMemory pageMem, + int groupId, + final long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + PageIo init, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + boolean releaseAfterWrite = true; + long page = pageMem.acquirePage(groupId, pageId, statHolder); + try { + long pageAddr = writeLock(pageMem, groupId, pageId, page, lsnr, false); + + if (pageAddr == 0L) { + return lockFailed; + } + + boolean ok = false; + + try { + if (init != null) { + // It is a new page and we have to initialize it. + doInitPage(pageMem, groupId, pageId, pageAddr, init); + } else { + init = pageMem.ioRegistry().resolve(pageAddr); + } + + R res = h.run(groupId, pageId, page, pageAddr, init, arg, intArg, statHolder); + + ok = true; + + return res; + } finally { + assert PageIo.getCrc(pageAddr) == 0; + + if (releaseAfterWrite = h.releaseAfterWrite(groupId, pageId, page, pageAddr, arg, intArg)) { + writeUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr, ok); + } + } + } finally { + if (releaseAfterWrite) { + pageMem.releasePage(groupId, pageId, page); + } + } + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Page must already be acquired. Review comment: Write lock? ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { + /** No-op page handler. */ + public static final PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg, statHolder) -> Boolean.TRUE; + + /** + * Handles the page. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param io IO. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteInternalCheckedException If failed. + */ + public R run( + int groupId, + long pageId, + long page, + long pageAddr, + PageIo io, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException; + + /** + * Checks whether write lock (and acquiring if applicable) should be released after handling. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return {@code true} If release. + */ + default boolean releaseAfterWrite( + int groupId, + long pageId, + long page, + long pageAddr, + X arg, + int intArg + ) { + return true; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long page = pageMem.acquirePage(groupId, pageId, statHolder); + + try { + return readPage(pageMem, groupId, pageId, page, lsnr, h, arg, intArg, lockFailed, statHolder); + } finally { + pageMem.releasePage(groupId, pageId, page); + } + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Page must already be acquired. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long pageAddr = 0L; + + try { + if ((pageAddr = readLock(pageMem, groupId, pageId, page, lsnr)) == 0L) { + return lockFailed; + } + + PageIo io = pageMem.ioRegistry().resolve(pageAddr); + + return h.run(groupId, pageId, page, pageAddr, io, arg, intArg, statHolder); + } finally { + if (pageAddr != 0L) { + readUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr); + } + } + } + + /** + * Acquires the read lock on the page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @return Page address or {@code 0} if acquiring failed. + */ + public static long readLock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr + ) { + lsnr.onBeforeReadLock(groupId, pageId, page); + + long pageAddr = pageMem.readLock(groupId, pageId, page); + + lsnr.onReadLock(groupId, pageId, page, pageAddr); + + return pageAddr; + } + + /** + * Releases acquired read lock. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param lsnr Lock listener. + */ + public static void readUnlock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr + ) { + lsnr.onReadUnlock(groupId, pageId, page, pageAddr); + + pageMem.readUnlock(groupId, pageId, page); + } + + /** + * Initializes a new page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param init IO for new page initialization. + * @param lsnr Lock listener. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteInternalCheckedException If failed. + * @see PageIo#initNewPage(long, long, int) + */ + public static void initPage( + PageMemory pageMem, + int groupId, + long pageId, + PageIo init, + PageLockListener lsnr, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + Boolean res = writePage( + pageMem, + groupId, + pageId, + lsnr, + PageHandler.NO_OP, + init, + null, + 0, + FALSE, + statHolder + ); + + assert res != FALSE; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R writePage( + PageMemory pageMem, + int groupId, + final long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + PageIo init, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + boolean releaseAfterWrite = true; + long page = pageMem.acquirePage(groupId, pageId, statHolder); + try { + long pageAddr = writeLock(pageMem, groupId, pageId, page, lsnr, false); + + if (pageAddr == 0L) { + return lockFailed; + } + + boolean ok = false; + + try { + if (init != null) { Review comment: So we signal that page needs to be initialized by passing `null` in the `init` argument? Or there is some invariant here? ########## File path: modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/util/PageHandler.java ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.util; + +import static java.lang.Boolean.FALSE; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagememory.PageMemory; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.lang.IgniteInternalCheckedException; + +/** + * Page handler. + * + * @param <X> Type of the arbitrary parameter. + * @param <R> Type of the result. + */ +public interface PageHandler<X, R> { + /** No-op page handler. */ + public static final PageHandler<Void, Boolean> NO_OP = (groupId, pageId, page, pageAddr, io, arg, intArg, statHolder) -> Boolean.TRUE; + + /** + * Handles the page. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param io IO. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param statHolder Statistics holder to track IO operations. + * @return Result. + * @throws IgniteInternalCheckedException If failed. + */ + public R run( + int groupId, + long pageId, + long page, + long pageAddr, + PageIo io, + X arg, + int intArg, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException; + + /** + * Checks whether write lock (and acquiring if applicable) should be released after handling. + * + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return {@code true} If release. + */ + default boolean releaseAfterWrite( + int groupId, + long pageId, + long page, + long pageAddr, + X arg, + int intArg + ) { + return true; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long page = pageMem.acquirePage(groupId, pageId, statHolder); + + try { + return readPage(pageMem, groupId, pageId, page, lsnr, h, arg, intArg, lockFailed, statHolder); + } finally { + pageMem.releasePage(groupId, pageId, page); + } + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Page must already be acquired. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R readPage( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr, + PageHandler<X, R> h, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long pageAddr = 0L; + + try { + if ((pageAddr = readLock(pageMem, groupId, pageId, page, lsnr)) == 0L) { + return lockFailed; + } + + PageIo io = pageMem.ioRegistry().resolve(pageAddr); + + return h.run(groupId, pageId, page, pageAddr, io, arg, intArg, statHolder); + } finally { + if (pageAddr != 0L) { + readUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr); + } + } + } + + /** + * Acquires the read lock on the page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @return Page address or {@code 0} if acquiring failed. + */ + public static long readLock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr + ) { + lsnr.onBeforeReadLock(groupId, pageId, page); + + long pageAddr = pageMem.readLock(groupId, pageId, page); + + lsnr.onReadLock(groupId, pageId, page, pageAddr); + + return pageAddr; + } + + /** + * Releases acquired read lock. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param lsnr Lock listener. + */ + public static void readUnlock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr + ) { + lsnr.onReadUnlock(groupId, pageId, page, pageAddr); + + pageMem.readUnlock(groupId, pageId, page); + } + + /** + * Initializes a new page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param init IO for new page initialization. + * @param lsnr Lock listener. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteInternalCheckedException If failed. + * @see PageIo#initNewPage(long, long, int) + */ + public static void initPage( + PageMemory pageMem, + int groupId, + long pageId, + PageIo init, + PageLockListener lsnr, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + Boolean res = writePage( + pageMem, + groupId, + pageId, + lsnr, + PageHandler.NO_OP, + init, + null, + 0, + FALSE, + statHolder + ); + + assert res != FALSE; + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R writePage( + PageMemory pageMem, + int groupId, + final long pageId, + PageLockListener lsnr, + PageHandler<X, R> h, + PageIo init, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + boolean releaseAfterWrite = true; + long page = pageMem.acquirePage(groupId, pageId, statHolder); + try { + long pageAddr = writeLock(pageMem, groupId, pageId, page, lsnr, false); + + if (pageAddr == 0L) { + return lockFailed; + } + + boolean ok = false; + + try { + if (init != null) { + // It is a new page and we have to initialize it. + doInitPage(pageMem, groupId, pageId, pageAddr, init); + } else { + init = pageMem.ioRegistry().resolve(pageAddr); + } + + R res = h.run(groupId, pageId, page, pageAddr, init, arg, intArg, statHolder); + + ok = true; + + return res; + } finally { + assert PageIo.getCrc(pageAddr) == 0; + + if (releaseAfterWrite = h.releaseAfterWrite(groupId, pageId, page, pageAddr, arg, intArg)) { + writeUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr, ok); + } + } + } finally { + if (releaseAfterWrite) { + pageMem.releasePage(groupId, pageId, page); + } + } + } + + /** + * Executes handler under the read lock or returns {@code lockFailed} if lock failed. Page must already be acquired. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @param h Handler. + * @param init IO for new page initialization or {@code null} if it is an existing page. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param lockFailed Result in case of lock failure due to page recycling. + * @param statHolder Statistics holder to track IO operations. + * @return Handler result. + * @throws IgniteInternalCheckedException If failed. + */ + public static <X, R> R writePage( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr, + PageHandler<X, R> h, + PageIo init, + X arg, + int intArg, + R lockFailed, + IoStatisticsHolder statHolder + ) throws IgniteInternalCheckedException { + long pageAddr = writeLock(pageMem, groupId, pageId, page, lsnr, false); + + if (pageAddr == 0L) { + return lockFailed; + } + + boolean ok = false; + + try { + if (init != null) { + // It is a new page and we have to initialize it. + doInitPage(pageMem, groupId, pageId, pageAddr, init); + } else { + init = pageMem.ioRegistry().resolve(pageAddr); + } + + R res = h.run(groupId, pageId, page, pageAddr, init, arg, intArg, statHolder); + + ok = true; + + return res; + } finally { + assert PageIo.getCrc(pageAddr) == 0; + + if (h.releaseAfterWrite(groupId, pageId, page, pageAddr, arg, intArg)) { + writeUnlock(pageMem, groupId, pageId, page, pageAddr, lsnr, ok); + } + } + } + + /** + * Acquires the write lock on the page. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param lsnr Lock listener. + * @param tryLock Only try to lock without waiting. + * @return Page address or {@code 0} if failed to lock due to recycling. + */ + public static long writeLock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + PageLockListener lsnr, + boolean tryLock + ) { + lsnr.onBeforeWriteLock(groupId, pageId, page); + + long pageAddr = tryLock ? pageMem.tryWriteLock(groupId, pageId, page) : pageMem.writeLock(groupId, pageId, page); + + lsnr.onWriteLock(groupId, pageId, page, pageAddr); + + return pageAddr; + } + + /** + * Releases acquired write lock. + * + * @param pageMem Page memory. + * @param groupId Group ID. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param lsnr Lock listener. + * @param dirty Page is dirty. + */ + public static void writeUnlock( + PageMemory pageMem, + int groupId, + long pageId, + long page, + long pageAddr, + PageLockListener lsnr, + boolean dirty + ) { + lsnr.onWriteUnlock(groupId, pageId, page, pageAddr); + + pageMem.writeUnlock(groupId, pageId, page, dirty); + } + + /** + * Invokes {@link PageIo#initNewPage(long, long, int)} and does additional checks. + */ + private static void doInitPage( + PageMemory pageMem, + int groupId, + long pageId, + long pageAddr, + PageIo init + ) throws IgniteInternalCheckedException { + assert PageIo.getCrc(pageAddr) == 0; + + init.initNewPage(pageAddr, pageId, pageMem.realPageSize(groupId)); + } + + /** + * Copies memory from one buffer to another. + * + * @param src Source. + * @param srcOff Source offset in bytes. + * @param dst Destination. + * @param dstOff Destination offset in bytes. + * @param cnt Bytes count to copy. + */ + public static void copyMemory(ByteBuffer src, long srcOff, ByteBuffer dst, long dstOff, long cnt) { + byte[] srcArr = src.hasArray() ? src.array() : null; Review comment: Why is this method here? It does not seem belong to 'page handling', it's more about copying. I suggest to create a separate util class and move the method there. Same applies to the next method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
