ibessonov commented on code in PR #920:
URL: https://github.com/apache/ignite-3/pull/920#discussion_r920950426
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -81,22 +80,36 @@ public void start() throws StorageException {
protected PersistentPageMemoryPartitionStorage createPartitionStorage(int
partId) throws StorageException {
TableView tableView = tableCfg.value();
- FilePageStore partitionFilePageStore =
ensurePartitionFilePageStore(tableView, partId);
+ PartitionFilePageStore partitionFilePageStore =
ensurePartitionFilePageStore(tableView, partId);
CheckpointTimeoutLock checkpointTimeoutLock =
dataRegion.checkpointManager().checkpointTimeoutLock();
checkpointTimeoutLock.checkpointReadLock();
try {
- PartitionMeta partitionMeta = getOrCreatePartitionMeta(tableView,
partId, partitionFilePageStore);
+ PersistentPageMemory persistentPageMemory =
dataRegion.pageMemory();
- TableFreeList tableFreeList = createTableFreeList(tableView,
partId, partitionMeta);
+ int grpId = groupId(tableView);
+
+ PartitionMeta meta = partitionFilePageStore.meta();
+
+ if (meta.isCreated()) {
Review Comment:
Why do we need a "isCreated" flag? Makes absolutely no sense. You can
compare stored ids with default values instead.
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -226,8 +169,8 @@ TableFreeList createTableFreeList(
partId,
dataRegion.pageMemory(),
PageLockListenerNoOp.INSTANCE,
- partitionMeta.reuseListRoot.pageId(),
- partitionMeta.allocated,
+ partitionMeta.reuseListRootPageId(),
+ partitionMeta.isCreated(),
Review Comment:
What's the purpose of this flag? Can we remove it as well?
I'd prefer not having "isCreated" method at all.
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PartitionFilePageStore.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * File page store with partition meta.
+ */
+public class PartitionFilePageStore implements PageStore {
+ private final FilePageStore filePageStore;
+
+ private final PartitionMeta partitionMeta;
+
+ /**
+ * Constructor.
+ *
+ * @param filePageStore File page store.
+ * @param partitionMeta Partition meta.
+ */
+ public PartitionFilePageStore(FilePageStore filePageStore, PartitionMeta
partitionMeta) {
+ this.filePageStore = filePageStore;
+ this.partitionMeta = partitionMeta;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void addWriteListener(PageWriteListener listener) {
+ filePageStore.addWriteListener(listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void removeWriteListener(PageWriteListener listener) {
+ filePageStore.removeWriteListener(listener);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop(boolean clean) throws IgniteInternalCheckedException {
+ filePageStore.stop(clean);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long allocatePage() throws IgniteInternalCheckedException {
+ return filePageStore.allocatePage();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int pages() {
+ return filePageStore.pages();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc)
throws IgniteInternalCheckedException {
+ return filePageStore.read(pageId, pageBuf, keepCrc);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(long pageId, ByteBuffer pageBuf, int tag, boolean
calculateCrc) throws IgniteInternalCheckedException {
+ filePageStore.write(pageId, pageBuf, tag, calculateCrc);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void sync() throws IgniteInternalCheckedException {
+ filePageStore.sync();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean exists() {
+ return filePageStore.exists();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void ensure() throws IgniteInternalCheckedException {
+ filePageStore.ensure();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws IOException {
+ filePageStore.close();
+ }
+
+ /**
+ * Returns partition meta.
+ */
+ public PartitionMeta meta() {
Review Comment:
This method is called at the and of checkpoint, right? How can you be sure
that the object is not modified?
I don't see any comments about meta snapshots.
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PartitionFilePageStoreFactory.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreHeader.readHeader;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
+import org.apache.ignite.internal.pagememory.persistence.io.PartitionMetaIo;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Factory for creating {@link PartitionFilePageStore}.
+ */
+class PartitionFilePageStoreFactory {
+ /** Latest file page store version. */
+ public final int latestVersion = FilePageStore.VERSION_1;
+
+ private final FileIoFactory fileIoFactory;
+
+ private final int pageSize;
+
+ private PageIoRegistry ioRegistry;
+
+ /**
+ * Constructor.
+ *
+ * @param fileIoFactory File IO factory.
+ * @param ioRegistry Page IO registry.
+ * @param pageSize Page size in bytes.
+ */
+ public PartitionFilePageStoreFactory(
+ FileIoFactory fileIoFactory,
+ PageIoRegistry ioRegistry,
+ int pageSize
+ ) {
+ this.fileIoFactory = fileIoFactory;
+ this.ioRegistry = ioRegistry;
+ this.pageSize = pageSize;
+ }
+
+ /**
+ * Creates instance of {@link PartitionFilePageStore}.
+ *
+ * <p>If the partition file exists, it will read its {@link
FilePageStoreHeader header} and {@link PartitionMetaIo meta} ({@code
+ * pageIdx == 0}) to create the {@link PartitionFilePageStore}.
+ *
+ * @param filePath File page store path.
+ * @param readIntoBuffer Direct byte buffer for reading {@link
PartitionMetaIo meta} from {@code filePath}.
+ * @return File page store.
+ * @throws IgniteInternalCheckedException if failed
+ */
+ public PartitionFilePageStore createPageStore(
+ Path filePath,
+ ByteBuffer readIntoBuffer
+ ) throws IgniteInternalCheckedException {
+ assert readIntoBuffer.remaining() == pageSize : "Expected pageSize=" +
pageSize + ", bufferSize=" + readIntoBuffer.remaining();
+
+ if (!Files.exists(filePath)) {
+ return createPageStore(filePath, new
FilePageStoreHeader(latestVersion, pageSize), newPartitionMeta());
+ }
+
+ try (FileIo fileIo = fileIoFactory.create(filePath)) {
+ FilePageStoreHeader header = readHeader(fileIo);
+
+ if (header == null) {
+ header = new FilePageStoreHeader(latestVersion, pageSize);
+ }
+
+ PartitionMeta meta = readPartitionMeta(fileIo,
header.headerSize(), readIntoBuffer);
+
+ return createPageStore(filePath, header, meta);
+ } catch (IOException e) {
+ throw new IgniteInternalCheckedException("Error while creating
file page store [file=" + filePath + "]:", e);
+ }
+ }
+
+ private PartitionFilePageStore createPageStore(
+ Path filePath,
+ FilePageStoreHeader header,
+ PartitionMeta partitionMeta
+ ) throws IgniteInternalCheckedException {
+ if (header.version() == FilePageStore.VERSION_1) {
+ return new PartitionFilePageStore(
+ new FilePageStore(
+ header.version(),
+ header.pageSize(),
+ header.headerSize(),
+ partitionMeta.pageCount(),
+ filePath,
+ fileIoFactory
+ ),
+ partitionMeta
+ );
+ }
+
+ throw new IgniteInternalCheckedException(String.format(
+ "Unknown version of file page store [version=%s, file=%s]",
+ header.version(),
+ filePath
+ ));
+ }
+
+ private PartitionMeta readPartitionMeta(FileIo fileIo, int headerSize,
ByteBuffer readIntoBuffer) throws IOException {
+ if (fileIo.size() <= headerSize) {
+ return newPartitionMeta();
+ }
+
+ try {
+ fileIo.readFully(readIntoBuffer, headerSize);
+
+ long pageAddr = bufferAddress(readIntoBuffer);
+
+ if (PartitionMetaIo.getType(pageAddr) <= 0) {
+ return newPartitionMeta();
+ }
+
+ PartitionMetaIo partitionMetaIo = ioRegistry.resolve(pageAddr);
+
+ return new PartitionMeta(
Review Comment:
Can't we encapsulate this as well? I don't like that all methods of IO are
explicitly listed somewhere outside of IO. Someone will forget to add line here
while modifying Meta class, I promise you.
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -212,4 +241,53 @@ public void writePage(FullPageId fullPageId, ByteBuffer
buf, int tag) throws Ign
}
};
}
+
+ /**
+ * Updates the partition {@link PartitionMetaIo} (pageIdx == 0).
Review Comment:
"Page IO" is not a synonym for "Page"
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -161,109 +164,24 @@ public CheckpointDirtyPagesQueue toQueue() {
return null;
}
- List<FullPageId> pageIds = dirtyPages.get(regionIndex).getValue();
+ FullPageId[] pageIds = dirtyPages.get(regionIndex).dirtyPages.pageIds;
Review Comment:
Comment for the code above. Name "isToPositionLast" is very confusing
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -59,7 +66,10 @@ public class CheckpointPagesWriter implements Runnable {
* <p>Overall pages to write may be greater than this queue, since it may
be necessary to retire write some pages due to unsuccessful
* page write lock acquisition
*/
- private final CheckpointDirtyPagesQueue writePageIds;
+ private final IgniteConcurrentMultiPairQueue<PersistentPageMemory,
FullPageId> writePageIds;
+
+ /** Queue of dirty partition IDs to update {@link PartitionMetaIo} under
this task. */
Review Comment:
What do you mean by "update PartitionMetaIo"? IO classes are stateless, they
can't be updated
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -2056,39 +2076,39 @@ public FullPageId pullPageFromCpBuffer() {
}
/**
- * Gets a collection of dirty page IDs since the last checkpoint. If a
dirty page is being written after the checkpointing operation
- * begun, the modifications will be written to a temporary buffer which
will be flushed to the main memory after the checkpointing
- * finished. This method must be called when no concurrent operations on
pages are performed.
+ * Returns the container of dirty page IDs and partition IDs since the
last checkpoint. If a dirty page is being written after the
+ * checkpointing operation begun, the modifications will be written to a
temporary buffer which will be flushed to the main memory after
+ * the checkpointing finished. This method must be called when no
concurrent operations on pages are performed.
*
* @param allowToReplace The sign which allows replacing pages from a
checkpoint by page replacer.
- * @return Collection view of dirty page IDs.
* @throws IgniteInternalException If checkpoint has been already started
and was not finished.
*/
- public Collection<FullPageId> beginCheckpoint(CompletableFuture<?>
allowToReplace) throws IgniteInternalException {
+ public CollectionDirtyPages beginCheckpoint(CompletableFuture<?>
allowToReplace) throws IgniteInternalException {
if (segments == null) {
- return List.of();
+ return new CollectionDirtyPages(List.of(), List.of());
}
- Collection<FullPageId>[] collections = new Collection[segments.length];
+ Set<FullPageId>[] dirtyPageIds = new Set[segments.length];
+
+ Set<GroupPartitionId> dirtyPartitionIds = new HashSet<>();
for (int i = 0; i < segments.length; i++) {
- Segment seg = segments[i];
+ Segment segment = segments[i];
- if (seg.checkpointPages != null) {
- throw new IgniteInternalException("Failed to begin checkpoint
(it is already in progress).");
- }
+ assert segment.checkpointPages == null : "Failed to begin
checkpoint (it is already in progress)";
+
+ Set<FullPageId> segmentDirtyPages = (dirtyPageIds[i] =
segment.dirtyPages);
- Set<FullPageId> dirtyPages = seg.dirtyPages;
- collections[i] = dirtyPages;
+ dirtyPartitionIds.addAll(segment.dirtyPartitions);
Review Comment:
You see, now it's required to manually merge these collections while holding
a write lock. Maybe it's not a big deal.
Am I right that this collection is temporary and we have plans to remove it
once we implement a "faster" way of taking partitions meta snapshots? If so,
then please reflect this in a comment somewhere.
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -212,4 +241,53 @@ public void writePage(FullPageId fullPageId, ByteBuffer
buf, int tag) throws Ign
}
};
}
+
+ /**
+ * Updates the partition {@link PartitionMetaIo} (pageIdx == 0).
+ *
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ private void updatePartitions() throws IgniteInternalCheckedException {
+ Result<PersistentPageMemory, GroupPartitionId> queueResult = new
Result<>();
+
+ while (!shutdownNow.getAsBoolean() &&
updatePartitionIds.next(queueResult)) {
+ GroupPartitionId partitionId = queueResult.getValue();
+
+ PartitionFilePageStore partitionFilePageStore =
partitionFilePageStoreManager.getStore(
+ partitionId.getGroupId(),
+ partitionId.getPartitionId()
+ );
+
+ ByteBuffer buffer = threadBuf.get();
+
+ PersistentPageMemory pageMemory = queueResult.getKey();
+
+ long partitionMetaPageId =
pageMemory.partitionMetaPageId(partitionId.getGroupId(),
partitionId.getPartitionId());
+
+ long pageAddr = GridUnsafe.bufferAddress(buffer);
+
+ // TODO: IGNITE-16657 Merger will put the page into a partition
file.
+ PartitionMetaIo io = PartitionMetaIo.VERSIONS.latest();
+
+ io.initNewPage(pageAddr, partitionMetaPageId, buffer.capacity());
+
+ PartitionMeta meta = partitionFilePageStore.meta();
Review Comment:
Please rename method "meta", it's not very clear. Also, I thought that we
will not store meta directly in page store, why did you decide to do this
anyway?
##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryStorageIoModule.java:
##########
@@ -38,8 +37,7 @@ public Collection<IoVersions<?>> ioVersions() {
TableMetaIo.VERSIONS,
TableInnerIo.VERSIONS,
TableLeafIo.VERSIONS,
- TableDataIo.VERSIONS,
- PartitionMetaIo.VERSIONS
+ TableDataIo.VERSIONS
Review Comment:
So, partition is now outside of table module, but it's aware about "rootId",
weird
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -279,56 +288,69 @@ public List<CheckpointListener>
collectCheckpointListeners(Collection<? extends
private DataRegionsDirtyPages beginCheckpoint(
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
CompletableFuture<?> allowToReplace
- ) {
- Collection<IgniteBiTuple<PersistentPageMemory,
Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
+ ) throws IgniteInternalCheckedException {
+ Collection<DataRegionDirtyPages<CollectionDirtyPages>>
dataRegionsDirtyPages = new ArrayList<>(dataRegions.size());
for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
- Collection<FullPageId> dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
+ CollectionDirtyPages dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
- pages.add(new IgniteBiTuple<>(dataRegion.pageMemory(),
dirtyPages));
+ dataRegionsDirtyPages.add(new
DataRegionDirtyPages<>(dataRegion.pageMemory(), dirtyPages));
}
- return new DataRegionsDirtyPages(pages);
+ for (DataRegionDirtyPages<CollectionDirtyPages> dataRegionDirtyPages :
dataRegionsDirtyPages) {
+ for (GroupPartitionId dirtyPartition :
dataRegionDirtyPages.dirtyPages.partitionIds()) {
+
partitionFilePageStoreManager.getStore(dirtyPartition.getGroupId(),
dirtyPartition.getPartitionId()).updateMetaPageCount();
+ }
+ }
+
+ return new DataRegionsDirtyPages(dataRegionsDirtyPages);
}
CheckpointDirtyPages createAndSortCheckpointDirtyPages(
DataRegionsDirtyPages dataRegionsDirtyPages
) throws IgniteInternalCheckedException {
- List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>>
checkpointPages = new ArrayList<>();
+ List<DataRegionDirtyPages<ArrayDirtyPages>> checkpointDirtyPages = new
ArrayList<>();
int realPagesArrSize = 0;
- for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
regionDirtyPages : dataRegionsDirtyPages.dirtyPages) {
- FullPageId[] checkpointRegionDirtyPages = new
FullPageId[regionDirtyPages.getValue().size()];
+ for (DataRegionDirtyPages<CollectionDirtyPages> dataRegionDirtyPages :
dataRegionsDirtyPages.dirtyPages) {
+ FullPageId[] pageIds = new
FullPageId[dataRegionDirtyPages.dirtyPages.pageIds().size()];
int pagePos = 0;
- for (FullPageId dirtyPage : regionDirtyPages.getValue()) {
+ for (FullPageId dirtyPage :
dataRegionDirtyPages.dirtyPages.pageIds()) {
assert realPagesArrSize++ !=
dataRegionsDirtyPages.dirtyPageCount :
"Incorrect estimated dirty pages number: " +
dataRegionsDirtyPages.dirtyPageCount;
- checkpointRegionDirtyPages[pagePos++] = dirtyPage;
+ pageIds[pagePos++] = dirtyPage;
Review Comment:
What if we use arraycopy here somehow? :) Or maybe we should just believe in
JIT
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -212,4 +241,53 @@ public void writePage(FullPageId fullPageId, ByteBuffer
buf, int tag) throws Ign
}
};
}
+
+ /**
+ * Updates the partition {@link PartitionMetaIo} (pageIdx == 0).
+ *
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ private void updatePartitions() throws IgniteInternalCheckedException {
+ Result<PersistentPageMemory, GroupPartitionId> queueResult = new
Result<>();
+
+ while (!shutdownNow.getAsBoolean() &&
updatePartitionIds.next(queueResult)) {
+ GroupPartitionId partitionId = queueResult.getValue();
+
+ PartitionFilePageStore partitionFilePageStore =
partitionFilePageStoreManager.getStore(
+ partitionId.getGroupId(),
+ partitionId.getPartitionId()
+ );
+
+ ByteBuffer buffer = threadBuf.get();
+
+ PersistentPageMemory pageMemory = queueResult.getKey();
+
+ long partitionMetaPageId =
pageMemory.partitionMetaPageId(partitionId.getGroupId(),
partitionId.getPartitionId());
+
+ long pageAddr = GridUnsafe.bufferAddress(buffer);
+
+ // TODO: IGNITE-16657 Merger will put the page into a partition
file.
+ PartitionMetaIo io = PartitionMetaIo.VERSIONS.latest();
+
+ io.initNewPage(pageAddr, partitionMetaPageId, buffer.capacity());
+
+ PartitionMeta meta = partitionFilePageStore.meta();
+
+ io.setTreeRootPageId(pageAddr, meta.treeRootPageId());
Review Comment:
Dude, what is happening. This must be encapsulated!
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreHeader.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import static java.nio.ByteOrder.nativeOrder;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.fileio.FileIo;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link FilePageStore} header.
+ *
+ * <p>Total length in bytes {@link #headerSize()}.
+ *
+ * <ul>
+ * <li>{@link #SIGNATURE signature} (8 byte)</li>
Review Comment:
8 byte**s**
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -279,56 +288,69 @@ public List<CheckpointListener>
collectCheckpointListeners(Collection<? extends
private DataRegionsDirtyPages beginCheckpoint(
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
CompletableFuture<?> allowToReplace
- ) {
- Collection<IgniteBiTuple<PersistentPageMemory,
Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
+ ) throws IgniteInternalCheckedException {
+ Collection<DataRegionDirtyPages<CollectionDirtyPages>>
dataRegionsDirtyPages = new ArrayList<>(dataRegions.size());
for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
- Collection<FullPageId> dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
+ CollectionDirtyPages dirtyPages =
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
- pages.add(new IgniteBiTuple<>(dataRegion.pageMemory(),
dirtyPages));
+ dataRegionsDirtyPages.add(new
DataRegionDirtyPages<>(dataRegion.pageMemory(), dirtyPages));
}
- return new DataRegionsDirtyPages(pages);
+ for (DataRegionDirtyPages<CollectionDirtyPages> dataRegionDirtyPages :
dataRegionsDirtyPages) {
+ for (GroupPartitionId dirtyPartition :
dataRegionDirtyPages.dirtyPages.partitionIds()) {
+
partitionFilePageStoreManager.getStore(dirtyPartition.getGroupId(),
dirtyPartition.getPartitionId()).updateMetaPageCount();
+ }
+ }
+
+ return new DataRegionsDirtyPages(dataRegionsDirtyPages);
}
CheckpointDirtyPages createAndSortCheckpointDirtyPages(
DataRegionsDirtyPages dataRegionsDirtyPages
) throws IgniteInternalCheckedException {
- List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>>
checkpointPages = new ArrayList<>();
+ List<DataRegionDirtyPages<ArrayDirtyPages>> checkpointDirtyPages = new
ArrayList<>();
int realPagesArrSize = 0;
- for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>>
regionDirtyPages : dataRegionsDirtyPages.dirtyPages) {
- FullPageId[] checkpointRegionDirtyPages = new
FullPageId[regionDirtyPages.getValue().size()];
+ for (DataRegionDirtyPages<CollectionDirtyPages> dataRegionDirtyPages :
dataRegionsDirtyPages.dirtyPages) {
+ FullPageId[] pageIds = new
FullPageId[dataRegionDirtyPages.dirtyPages.pageIds().size()];
int pagePos = 0;
- for (FullPageId dirtyPage : regionDirtyPages.getValue()) {
+ for (FullPageId dirtyPage :
dataRegionDirtyPages.dirtyPages.pageIds()) {
assert realPagesArrSize++ !=
dataRegionsDirtyPages.dirtyPageCount :
"Incorrect estimated dirty pages number: " +
dataRegionsDirtyPages.dirtyPageCount;
- checkpointRegionDirtyPages[pagePos++] = dirtyPage;
+ pageIds[pagePos++] = dirtyPage;
}
// Some pages may have been already replaced.
if (pagePos == 0) {
continue;
- } else if (pagePos != checkpointRegionDirtyPages.length) {
- checkpointPages.add(new
IgniteBiTuple<>(regionDirtyPages.getKey(),
Arrays.copyOf(checkpointRegionDirtyPages, pagePos)));
- } else {
- checkpointPages.add(new
IgniteBiTuple<>(regionDirtyPages.getKey(), checkpointRegionDirtyPages));
+ } else if (pagePos != pageIds.length) {
+ pageIds = Arrays.copyOf(pageIds, pagePos);
}
+
+ assert !dataRegionDirtyPages.dirtyPages.partitionIds().isEmpty();
+
+ GroupPartitionId[] partitionIds =
dataRegionDirtyPages.dirtyPages.partitionIds().toArray(new GroupPartitionId[0]);
+
+ checkpointDirtyPages.add(new DataRegionDirtyPages<>(
+ dataRegionDirtyPages.pageMemory,
+ new ArrayDirtyPages(pageIds, partitionIds)
+ ));
}
- List<ForkJoinTask<?>> parallelSortTasks = checkpointPages.stream()
- .map(IgniteBiTuple::getValue)
- .filter(pages -> pages.length >= PARALLEL_SORT_THRESHOLD)
- .map(pages -> parallelSortThreadPool.submit(() ->
Arrays.parallelSort(pages, DIRTY_PAGE_COMPARATOR)))
+ List<ForkJoinTask<?>> parallelSortTasks = checkpointDirtyPages.stream()
Review Comment:
I wish you would add more comments here and there, this core is rather
complicated and data just moves from one place to the other. It's very hard to
track.
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -212,4 +241,53 @@ public void writePage(FullPageId fullPageId, ByteBuffer
buf, int tag) throws Ign
}
};
}
+
+ /**
+ * Updates the partition {@link PartitionMetaIo} (pageIdx == 0).
+ *
+ * @throws IgniteInternalCheckedException If failed.
+ */
+ private void updatePartitions() throws IgniteInternalCheckedException {
+ Result<PersistentPageMemory, GroupPartitionId> queueResult = new
Result<>();
+
+ while (!shutdownNow.getAsBoolean() &&
updatePartitionIds.next(queueResult)) {
+ GroupPartitionId partitionId = queueResult.getValue();
+
+ PartitionFilePageStore partitionFilePageStore =
partitionFilePageStoreManager.getStore(
+ partitionId.getGroupId(),
+ partitionId.getPartitionId()
+ );
+
+ ByteBuffer buffer = threadBuf.get();
+
+ PersistentPageMemory pageMemory = queueResult.getKey();
+
+ long partitionMetaPageId =
pageMemory.partitionMetaPageId(partitionId.getGroupId(),
partitionId.getPartitionId());
+
+ long pageAddr = GridUnsafe.bufferAddress(buffer);
+
+ // TODO: IGNITE-16657 Merger will put the page into a partition
file.
+ PartitionMetaIo io = PartitionMetaIo.VERSIONS.latest();
+
+ io.initNewPage(pageAddr, partitionMetaPageId, buffer.capacity());
+
+ PartitionMeta meta = partitionFilePageStore.meta();
+
+ io.setTreeRootPageId(pageAddr, meta.treeRootPageId());
+ io.setReuseListRootPageId(pageAddr, meta.reuseListRootPageId());
+ io.setPageCount(pageAddr, meta.pageCount());
+
+ FullPageId fullPageId = new FullPageId(partitionMetaPageId,
partitionId.getGroupId());
+
+ PageStore store = pageWriter.write(
Review Comment:
So, for regular pages you're using the result of "createPageStoreWriter",
which doesn't require partition generation tag. But here we have "pageWriter"
that does require partition generation tag.
Something's messed up here, it shouldn't be this complicated. Why don't we
have a single way of writing pages? Why do we need to know tag here?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -87,57 +103,66 @@ public class CheckpointPagesWriter implements Runnable {
*
* @param tracker Checkpoint metrics tracker.
* @param writePageIds Queue of dirty page IDs to write.
+ * @param updatePartitionIds Queue of dirty partition IDs to update.
* @param updStores Updating storage.
* @param doneFut Done future.
* @param beforePageWrite Action to be performed before every page write.
* @param log Logger.
* @param threadBuf Thread local byte buffer.
* @param checkpointProgress Checkpoint progress.
* @param pageWriter File page store manager.
+ * @param ioRegistry Page IO registry.
+ * @param partitionFilePageStoreManager Partition file page store manager.
* @param shutdownNow Shutdown supplier.
*/
CheckpointPagesWriter(
IgniteLogger log,
CheckpointMetricsTracker tracker,
- CheckpointDirtyPagesQueue writePageIds,
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds,
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> updatePartitionIds,
ConcurrentMap<PageStore, LongAdder> updStores,
CompletableFuture<?> doneFut,
Runnable beforePageWrite,
ThreadLocal<ByteBuffer> threadBuf,
CheckpointProgressImpl checkpointProgress,
CheckpointPageWriter pageWriter,
+ PageIoRegistry ioRegistry,
+ PartitionFilePageStoreManager partitionFilePageStoreManager,
BooleanSupplier shutdownNow
) {
this.log = log;
this.tracker = tracker;
this.writePageIds = writePageIds;
+ this.updatePartitionIds = updatePartitionIds;
this.updStores = updStores;
this.doneFut = doneFut;
this.beforePageWrite = beforePageWrite;
this.threadBuf = threadBuf;
this.checkpointProgress = checkpointProgress;
this.pageWriter = pageWriter;
+ this.ioRegistry = ioRegistry;
+ this.partitionFilePageStoreManager = partitionFilePageStoreManager;
this.shutdownNow = shutdownNow;
}
/** {@inheritDoc} */
@Override
public void run() {
try {
- CheckpointDirtyPagesQueue pagesToRetry = writePages(writePageIds);
-
- if (pagesToRetry.isEmpty()) {
- doneFut.complete(null);
- } else {
- log.info("Checkpoint pages were not written yet due to "
- + "unsuccessful page write lock acquisition and will
be retried [pageCount={}]", pagesToRetry.size());
+ IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
pageIdsToRetry = writePages(writePageIds);
- while (!pagesToRetry.isEmpty()) {
- pagesToRetry = writePages(pagesToRetry);
+ while (!pageIdsToRetry.isEmpty()) {
+ if (log.isInfoEnabled()) {
Review Comment:
Oh my god, you print this message on first iteration. I consider this a bug,
please change it
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -161,109 +164,24 @@ public CheckpointDirtyPagesQueue toQueue() {
return null;
}
- List<FullPageId> pageIds = dirtyPages.get(regionIndex).getValue();
+ FullPageId[] pageIds = dirtyPages.get(regionIndex).dirtyPages.pageIds;
- if (fromPosition == pageIds.size() - 1 ||
!equalsByGroupAndPartition(pageIds.get(fromPosition), pageIds.get(fromPosition
+ 1))) {
+ if (fromPosition == pageIds.length - 1 ||
!equalsByGroupAndPartition(pageIds[fromPosition], pageIds[fromPosition + 1])) {
return new CheckpointDirtyPagesView(regionIndex, fromPosition,
fromPosition);
}
- FullPageId startPageId = pageIds.get(fromPosition);
+ FullPageId startPageId = pageIds[fromPosition];
FullPageId endPageId = new
FullPageId(pageId(partitionId(startPageId.pageId()) + 1, (byte) 0, 0),
startPageId.groupId());
- int toPosition = binarySearch(pageIds.subList(fromPosition,
pageIds.size()), endPageId, DIRTY_PAGE_COMPARATOR);
+ int toPosition = binarySearch(pageIds, fromPosition, pageIds.length,
endPageId, DIRTY_PAGE_COMPARATOR);
toPosition = toPosition > 0 ? toPosition - 1 : -toPosition - 2;
Review Comment:
Is "toPosition" an "inclusive" index? I don't understand, why you subtract 1
when it's positive.
Usually, lower bound is inclusive and upper bound is exclusive. It looks
like you violated this rule. If so, why?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ArrayDirtyPages.java:
##########
@@ -15,37 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.pagememory;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
/**
- * Class for storing {@link TableTree} partition metadata.
+ * Dirty page array container.
*/
-class PartitionMeta {
- /** {@link TableTree} root. */
- final FullPageId treeRoot;
+class ArrayDirtyPages {
Review Comment:
Again, I don't get the order of words here. Was is supposed to be
"ArrayOfDirtyPages"?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/CollectionDirtyPages.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+
+/**
+ * Dirty pages collection container.
+ */
+public class CollectionDirtyPages {
+ private final Collection<FullPageId> pageIds;
Review Comment:
Should we explicitly use "Set" here? Or in the next collection?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -1549,18 +1573,14 @@ public long refreshOutdatedPage(int grpId, long pageId,
boolean rmv) {
loadedPages.remove(grpId, effectivePageId(pageId));
}
- CheckpointPages cpPages = checkpointPages;
+ CheckpointPages checkpointPages = this.checkpointPages;
- if (cpPages != null) {
- cpPages.markAsSaved(new FullPageId(pageId, grpId));
+ if (checkpointPages != null) {
+ checkpointPages.markAsSaved(new FullPageId(pageId, grpId));
Review Comment:
I still don't get it. A comment is required here, why are we modifying
these collections if there's an outdated page.
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -17,89 +17,92 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static java.util.Collections.binarySearch;
+import static java.util.Arrays.binarySearch;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
- * Sorted dirty pages from data regions that should be checkpointed.
- *
- * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ * Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs that should be
+ * checkpointed.
*/
class CheckpointDirtyPages {
- /** Dirty page ID comparator. */
+ /** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
.comparingInt(FullPageId::groupId)
.thenComparingLong(FullPageId::effectivePageId);
/** Empty checkpoint dirty pages. */
static final CheckpointDirtyPages EMPTY = new
CheckpointDirtyPages(List.of());
- /** Sorted dirty pages from data regions by groupId -> partitionId ->
pageIdx. */
- private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>>
dirtyPages;
-
- /** Total number of dirty pages. */
- private final int dirtyPagesCount;
+ /** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs. */
+ private final List<DataRegionDirtyPages<ArrayDirtyPages>> dirtyPages;
/**
* Constructor.
*
- * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
*/
- public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>>
dirtyPages) {
- this(dirtyPages.entrySet().stream().map(e -> new
IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+ public CheckpointDirtyPages(Map<PersistentPageMemory, ArrayDirtyPages>
dirtyPages) {
+ this(dirtyPages.entrySet().stream()
+ .map(e -> new DataRegionDirtyPages<>(e.getKey(), e.getValue()))
+ .collect(toList())
+ );
}
/**
* Constructor.
*
- * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
*/
- public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory,
List<FullPageId>>> dirtyPages) {
+ public CheckpointDirtyPages(List<DataRegionDirtyPages<ArrayDirtyPages>>
dirtyPages) {
assert dirtyPages instanceof RandomAccess : dirtyPages;
this.dirtyPages = dirtyPages;
-
- int count = 0;
-
- for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages :
dirtyPages) {
- assert !pages.getValue().isEmpty() : pages.getKey();
- assert pages.getValue() instanceof RandomAccess : pages.getValue();
-
- count += pages.getValue().size();
- }
-
- dirtyPagesCount = count;
}
/**
- * Returns total number of dirty pages.
+ * Returns total number of dirty page IDs.
*/
public int dirtyPagesCount() {
- return dirtyPagesCount;
+ return dirtyPages.stream().mapToInt(pages ->
pages.dirtyPages.pageIds.length).sum();
Review Comment:
How often is this method called? Pre-calculated value is fast. Counting a
count every time is slow. So I'm wondering, why did you do this?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -874,19 +871,37 @@ private void copyInBuffer(long absPtr, ByteBuffer tmpBuf)
{
}
/**
- * Get current prartition generation tag.
+ * Get current partition generation tag.
*
* @param seg Segment.
- * @param fullId Full page id.
+ * @param fullPageId Full page id.
* @return Current partition generation tag.
*/
- private int generationTag(Segment seg, FullPageId fullId) {
+ private int generationTag(Segment seg, FullPageId fullPageId) {
return seg.partGeneration(
- fullId.groupId(),
- partitionId(fullId.pageId())
+ fullPageId.groupId(),
+ partitionId(fullPageId.pageId())
);
}
+ /**
+ * Get current partition generation tag.
+ *
+ * @param fullPageId Full page id.
+ * @return Current partition generation tag.
+ */
+ public int generationTag(FullPageId fullPageId) {
Review Comment:
Why is this public now? Partition generation is internal information, isn't
it?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/CollectionDirtyPages.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+
+/**
+ * Dirty pages collection container.
+ */
+public class CollectionDirtyPages {
Review Comment:
Why not "DirtyPagesCollection"? I don't quite get the order of words here
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -17,89 +17,92 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static java.util.Collections.binarySearch;
+import static java.util.Arrays.binarySearch;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
- * Sorted dirty pages from data regions that should be checkpointed.
- *
- * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ * Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs that should be
+ * checkpointed.
*/
class CheckpointDirtyPages {
- /** Dirty page ID comparator. */
+ /** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
.comparingInt(FullPageId::groupId)
.thenComparingLong(FullPageId::effectivePageId);
/** Empty checkpoint dirty pages. */
static final CheckpointDirtyPages EMPTY = new
CheckpointDirtyPages(List.of());
- /** Sorted dirty pages from data regions by groupId -> partitionId ->
pageIdx. */
- private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>>
dirtyPages;
-
- /** Total number of dirty pages. */
- private final int dirtyPagesCount;
+ /** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs. */
+ private final List<DataRegionDirtyPages<ArrayDirtyPages>> dirtyPages;
/**
* Constructor.
*
- * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
*/
- public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>>
dirtyPages) {
- this(dirtyPages.entrySet().stream().map(e -> new
IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+ public CheckpointDirtyPages(Map<PersistentPageMemory, ArrayDirtyPages>
dirtyPages) {
+ this(dirtyPages.entrySet().stream()
+ .map(e -> new DataRegionDirtyPages<>(e.getKey(), e.getValue()))
+ .collect(toList())
+ );
}
/**
* Constructor.
*
- * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
*/
- public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory,
List<FullPageId>>> dirtyPages) {
+ public CheckpointDirtyPages(List<DataRegionDirtyPages<ArrayDirtyPages>>
dirtyPages) {
assert dirtyPages instanceof RandomAccess : dirtyPages;
this.dirtyPages = dirtyPages;
-
- int count = 0;
-
- for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages :
dirtyPages) {
- assert !pages.getValue().isEmpty() : pages.getKey();
- assert pages.getValue() instanceof RandomAccess : pages.getValue();
-
- count += pages.getValue().size();
- }
-
- dirtyPagesCount = count;
}
/**
- * Returns total number of dirty pages.
+ * Returns total number of dirty page IDs.
*/
public int dirtyPagesCount() {
- return dirtyPagesCount;
+ return dirtyPages.stream().mapToInt(pages ->
pages.dirtyPages.pageIds.length).sum();
}
/**
- * Returns a queue of dirty pages to be written to a checkpoint.
+ * Returns a queue of dirty page IDs to be written to a checkpoint.
*/
- public CheckpointDirtyPagesQueue toQueue() {
- return new CheckpointDirtyPagesQueue();
+ public IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
toDirtyPageIdQueue() {
+ List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> dirtyPageIds =
dirtyPages.stream()
+ .map(pages -> new IgniteBiTuple<>(pages.pageMemory,
pages.dirtyPages.pageIds))
+ .collect(toList());
+
+ return new IgniteConcurrentMultiPairQueue<>(dirtyPageIds);
}
/**
- * Looks for dirty page views for a specific group and partition.
+ * Returns a queue of dirty partition IDs to be modified to a checkpoint.
+ */
+ public IgniteConcurrentMultiPairQueue<PersistentPageMemory,
GroupPartitionId> toDirtyPartitionIdQueue() {
Review Comment:
Why is this a queue? Do we process it in parallel while holding checkpoint
write lock?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -17,89 +17,92 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static java.util.Collections.binarySearch;
+import static java.util.Arrays.binarySearch;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
- * Sorted dirty pages from data regions that should be checkpointed.
- *
- * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ * Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs that should be
+ * checkpointed.
*/
class CheckpointDirtyPages {
- /** Dirty page ID comparator. */
+ /** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
.comparingInt(FullPageId::groupId)
.thenComparingLong(FullPageId::effectivePageId);
/** Empty checkpoint dirty pages. */
static final CheckpointDirtyPages EMPTY = new
CheckpointDirtyPages(List.of());
- /** Sorted dirty pages from data regions by groupId -> partitionId ->
pageIdx. */
- private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>>
dirtyPages;
-
- /** Total number of dirty pages. */
- private final int dirtyPagesCount;
+ /** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs. */
+ private final List<DataRegionDirtyPages<ArrayDirtyPages>> dirtyPages;
Review Comment:
I suppose that DataRegionDirtyPages is defined as
`class DataRegionDirtyPages<T>` and T doesn't have to be a dirty pages
collection of any sort, is this correct?
This type is pretty mouthful, would it be more convenient to have
DataRegionData<ArrayDirtyPages>? Or something else that's less redundant?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PersistentPageMemory.java:
##########
@@ -920,19 +935,21 @@ public int invalidate(int grpId, int partId) {
int tag = 0;
- for (Segment seg : segments) {
- seg.writeLock().lock();
+ for (Segment segment : segments) {
+ segment.writeLock().lock();
try {
- int newTag = seg.incrementPartGeneration(grpId, partId);
+ int newTag = segment.incrementPartGeneration(grpId,
partId);
if (tag == 0) {
tag = newTag;
}
assert tag == newTag;
+
+ segment.dirtyPartitions.remove(new GroupPartitionId(grpId,
partId));
Review Comment:
This implies that each segment has independent set of partitions. I believe
that this set will be the same for all segments (on avarage). Was this a
conscious decision?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java:
##########
@@ -30,7 +30,7 @@
* View of pages which should be stored during current checkpoint.
*/
public class CheckpointPages {
- private final Set<FullPageId> segCheckpointPages;
+ private final Set<FullPageId> segmentDirtyPages;
Review Comment:
Ok, these can just be "segmentPages" then. There's no way that clean page
can get here, right?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -113,29 +116,29 @@ public CheckpointDirtyPagesQueue toQueue() {
FullPageId endPageId = new FullPageId(pageId(partId + 1, (byte) 0, 0),
grpId);
for (int i = 0; i < dirtyPages.size(); i++) {
- List<FullPageId> pageIds = dirtyPages.get(i).getValue();
+ FullPageId[] pageIds = dirtyPages.get(i).dirtyPages.pageIds;
Review Comment:
This comment is not for this particular line. I find it strange that
CheckpointDirtyPagesView is always used for a single partition, yet it's not
reflected in its name. Name "findView" is also very technical, it doesn't
reflect a meaning - here we find all pages that belong to a given partition,
right?
Also, passing a DataRegion or PageMemory instance here would help you
avoiding unnecessary binary searches.
Also, why exactly do we have a list of arrays instead of the Map, as in the
original constructor?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -17,89 +17,92 @@
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-import static java.util.Collections.binarySearch;
+import static java.util.Arrays.binarySearch;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
import static
org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.util.IgniteConcurrentMultiPairQueue;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
/**
- * Sorted dirty pages from data regions that should be checkpointed.
- *
- * <p>Dirty pages should be sorted by groupId -> partitionId -> pageIdx.
+ * Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs that should be
+ * checkpointed.
*/
class CheckpointDirtyPages {
- /** Dirty page ID comparator. */
+ /** Dirty page ID comparator by groupId -> partitionId -> pageIdx. */
static final Comparator<FullPageId> DIRTY_PAGE_COMPARATOR = Comparator
.comparingInt(FullPageId::groupId)
.thenComparingLong(FullPageId::effectivePageId);
/** Empty checkpoint dirty pages. */
static final CheckpointDirtyPages EMPTY = new
CheckpointDirtyPages(List.of());
- /** Sorted dirty pages from data regions by groupId -> partitionId ->
pageIdx. */
- private final List<IgniteBiTuple<PersistentPageMemory, List<FullPageId>>>
dirtyPages;
-
- /** Total number of dirty pages. */
- private final int dirtyPagesCount;
+ /** Dirty pages of data regions, with sorted page IDs by {@link
#DIRTY_PAGE_COMPARATOR} and unsorted partition IDs. */
+ private final List<DataRegionDirtyPages<ArrayDirtyPages>> dirtyPages;
/**
* Constructor.
*
- * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
*/
- public CheckpointDirtyPages(Map<PersistentPageMemory, List<FullPageId>>
dirtyPages) {
- this(dirtyPages.entrySet().stream().map(e -> new
IgniteBiTuple<>(e.getKey(), e.getValue())).collect(toList()));
+ public CheckpointDirtyPages(Map<PersistentPageMemory, ArrayDirtyPages>
dirtyPages) {
+ this(dirtyPages.entrySet().stream()
+ .map(e -> new DataRegionDirtyPages<>(e.getKey(), e.getValue()))
+ .collect(toList())
+ );
}
/**
* Constructor.
*
- * @param dirtyPages Sorted dirty pages from data regions by groupId ->
partitionId -> pageIdx.
+ * @param dirtyPages Dirty pages of data regions, with sorted page IDs by
{@link #DIRTY_PAGE_COMPARATOR} and unsorted partition IDs.
*/
- public CheckpointDirtyPages(List<IgniteBiTuple<PersistentPageMemory,
List<FullPageId>>> dirtyPages) {
+ public CheckpointDirtyPages(List<DataRegionDirtyPages<ArrayDirtyPages>>
dirtyPages) {
assert dirtyPages instanceof RandomAccess : dirtyPages;
this.dirtyPages = dirtyPages;
-
- int count = 0;
-
- for (IgniteBiTuple<PersistentPageMemory, List<FullPageId>> pages :
dirtyPages) {
- assert !pages.getValue().isEmpty() : pages.getKey();
- assert pages.getValue() instanceof RandomAccess : pages.getValue();
-
- count += pages.getValue().size();
- }
-
- dirtyPagesCount = count;
}
/**
- * Returns total number of dirty pages.
+ * Returns total number of dirty page IDs.
*/
public int dirtyPagesCount() {
- return dirtyPagesCount;
+ return dirtyPages.stream().mapToInt(pages ->
pages.dirtyPages.pageIds.length).sum();
}
/**
- * Returns a queue of dirty pages to be written to a checkpoint.
+ * Returns a queue of dirty page IDs to be written to a checkpoint.
*/
- public CheckpointDirtyPagesQueue toQueue() {
- return new CheckpointDirtyPagesQueue();
+ public IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
toDirtyPageIdQueue() {
+ List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> dirtyPageIds =
dirtyPages.stream()
+ .map(pages -> new IgniteBiTuple<>(pages.pageMemory,
pages.dirtyPages.pageIds))
+ .collect(toList());
+
+ return new IgniteConcurrentMultiPairQueue<>(dirtyPageIds);
Review Comment:
This looks like a very non-optimal step. Can we consider/discuss other
implementation? Why does it have to be a list of pairs and not a wrapper of
existing list of arrays? Data copying could be avoided.
Plus, that list of arrays was itself created from the map, we just convert
data back and forth, we should probably decrease a number of conversions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]