nizhikov commented on code in PR #10950:
URL: https://github.com/apache/ignite/pull/10950#discussion_r1349861579


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.dump.DumpEntry;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractCreateSnapshotFutureTask;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
+import 
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static 
org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cacheDataFilename;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
+
+/**
+ * Task creates cache group dump.
+ * Dump is a consistent snapshot of cache entries.
+ * Directories structure is same as a full snapshot but each partitions saved 
in "part-0.dump" file.
+ * Files structure is a set of {@link DumpEntry} written one by one.
+ *
+ * @see Dump
+ * @see DumpEntry
+ */
+public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask 
implements DumpEntryChangeListener {
+    /** Dump files name. */
+    public static final String DUMP_FILE_EXT = ".dump";
+
+    /** Root dump directory. */
+    private final File dumpDir;
+
+    /** */
+    private final FileIOFactory ioFactory;
+
+    /**
+     * Dump contextes.
+     * Key is [group_id, partition_id] combined in single long value.
+     *
+     * @see #toLong(int, int)
+     */
+    private final Map<Long, PartitionDumpContext> dumpCtxs = new 
ConcurrentHashMap<>();
+
+    /** Local node is primary for set of group partitions. */
+    private final Map<Integer, Set<Integer>> grpPrimaries = new 
ConcurrentHashMap<>();
+
+    /**
+     * Map shared across all instances of {@link PartitionDumpContext} and 
{@link DumpEntrySerializer}.
+     * We use per thread buffer because number of threads is fewer then number 
of partitions.
+     * Regular count of partitions is {@link 
RendezvousAffinityFunction#DFLT_PARTITION_COUNT}
+     * and thread is {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT} whic 
is significantly less.
+     */
+    private final ConcurrentMap<Long, ByteBuffer> thLocBufs = new 
ConcurrentHashMap<>();
+
+    /**
+     * @param cctx Cache context.
+     * @param srcNodeId Node id which cause snapshot task creation.
+     * @param reqId Snapshot operation request ID.
+     * @param dumpName Dump name.
+     * @param ioFactory IO factory.
+     * @param snpSndr Snapshot sender.
+     * @param parts Parts to dump.
+     */
+    public CreateDumpFutureTask(
+        GridCacheSharedContext<?, ?> cctx,
+        UUID srcNodeId,
+        UUID reqId,
+        String dumpName,
+        File dumpDir,
+        FileIOFactory ioFactory,
+        SnapshotSender snpSndr,
+        Map<Integer, Set<Integer>> parts
+    ) {
+        super(
+            cctx,
+            srcNodeId,
+            reqId,
+            dumpName,
+            snpSndr,
+            parts
+        );
+
+        this.dumpDir = dumpDir;
+        this.ioFactory = ioFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean start() {
+        try {
+            if (log.isInfoEnabled())
+                log.info("Start cache dump [name=" + snpName + ", grps=" + 
parts.keySet() + ']');
+
+            createDumpLock();
+
+            processPartitions();
+
+            prepare();
+
+            saveSnapshotData();
+        }
+        catch (IgniteCheckedException | IOException e) {
+            acceptException(e);
+        }
+
+        return false; // Don't wait for checkpoint.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processPartitions() throws IgniteCheckedException 
{
+        super.processPartitions();
+
+        processed.values().forEach(parts -> parts.remove(INDEX_PARTITION));
+    }
+
+    /** Prepares all data structures to dump entries. */
+    private void prepare() throws IOException, IgniteCheckedException {
+        for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+            int grp = e.getKey();
+
+            File grpDumpDir = groupDirectory(cctx.cache().cacheGroup(grp));
+
+            if (!grpDumpDir.mkdirs())
+                throw new IgniteCheckedException("Dump directory can't be 
created: " + grpDumpDir);
+
+            CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+            for (GridCacheContext<?, ?> cctx : gctx.caches())
+                cctx.dumpListener(this);
+
+            grpPrimaries.put(
+                grp,
+                
gctx.affinity().primaryPartitions(gctx.shared().kernalContext().localNodeId(), 
gctx.affinity().lastVersion())
+            );
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<CompletableFuture<Void>> saveCacheConfigs() {
+        return processed.keySet().stream().map(grp -> runAsync(() -> {
+            CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+            File grpDir = groupDirectory(gctx);
+
+            IgniteUtils.ensureDirectory(grpDir, "dump group directory", null);
+
+            for (GridCacheContext<?, ?> cacheCtx : gctx.caches()) {
+                DynamicCacheDescriptor desc = 
cctx.kernalContext().cache().cacheDescriptor(cacheCtx.cacheId());
+
+                StoredCacheData cacheData = new StoredCacheData(new 
CacheConfiguration(desc.cacheConfiguration()));
+
+                cacheData.queryEntities(desc.schema().entities());
+                cacheData.sql(desc.sql());
+
+                cctx.cache().configManager().writeCacheData(cacheData, new 
File(grpDir, cacheDataFilename(cacheData.config())));
+            }
+        })).collect(Collectors.toList());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<CompletableFuture<Void>> saveGroup(int grp, 
Set<Integer> grpParts) {
+        long start = System.currentTimeMillis();
+
+        AtomicLong entriesCnt = new AtomicLong();
+        AtomicLong writtenEntriesCnt = new AtomicLong();
+        AtomicLong changedEntriesCnt = new AtomicLong();
+
+        String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();
+
+        CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grp);
+
+        if (log.isInfoEnabled())
+            log.info("Start group dump [name=" + name + ", id=" + grp + ']');
+
+        List<CompletableFuture<Void>> futs = grpParts.stream().map(part -> 
runAsync(() -> {
+            long entriesCnt0 = 0;
+            long writtenEntriesCnt0 = 0;
+
+            try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
+                try (GridCloseableIterator<CacheDataRow> rows = 
gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
+                    if (rows == null)
+                        throw new IgniteCheckedException("Partition missing 
[part=" + part + ']');
+
+                    while (rows.hasNext()) {
+                        CacheDataRow row = rows.next();
+
+                        assert row.partition() == part;
+
+                        int cache = row.cacheId() == 0 ? grp : row.cacheId();
+
+                        if (dumpCtx.writeForIterator(cache, row.expireTime(), 
row.key(), row.value(), row.version()))
+                            writtenEntriesCnt0++;
+
+                        entriesCnt0++;
+                    }
+                }
+
+                entriesCnt.addAndGet(entriesCnt0);
+                writtenEntriesCnt.addAndGet(writtenEntriesCnt0);
+                changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Finish group partition dump [name=" + name +
+                        ", id=" + grp +
+                        ", part=" + part +
+                        ", time=" + (System.currentTimeMillis() - start) +
+                        ", iterEntriesCnt=" + entriesCnt +
+                        ", writtenIterEntriesCnt=" + entriesCnt +
+                        ", changedEntriesCnt=" + changedEntriesCnt + ']');
+
+                }
+            }
+        })).collect(Collectors.toList());
+
+        int futsSize = futs.size();
+
+        CompletableFuture.allOf(futs.toArray(new 
CompletableFuture[futsSize])).whenComplete((res, t) -> {
+            clearDumpListener(gctx);
+
+            if (log.isInfoEnabled()) {
+                log.info("Finish group dump [name=" + name +
+                    ", id=" + grp +
+                    ", time=" + (System.currentTimeMillis() - start) +
+                    ", iterEntriesCnt=" + entriesCnt.get() +
+                    ", writtenIterEntriesCnt=" + writtenEntriesCnt.get() +
+                    ", changedEntriesCnt=" + changedEntriesCnt.get() + ']');
+            }
+        });
+
+        return futs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeChange(GridCacheContext cctx, KeyCacheObject 
key, CacheObject val, long expireTime, GridCacheVersion ver) {
+        try {
+            int part = key.partition();
+            int grp = cctx.groupId();
+
+            assert part != -1;
+
+            if (!processed.get(grp).contains(part))
+                return;
+
+            dumpContext(grp, part).writeChanged(cctx.cacheId(), expireTime, 
key, val, ver);
+        }
+        catch (IgniteException e) {
+            acceptException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CompletableFuture<Void> closeAsync() {
+        if (closeFut == null) {
+            dumpCtxs.values().forEach(PartitionDumpContext::close);
+
+            Throwable err0 = err.get();
+
+            Set<GroupPartitionId> taken = new HashSet<>();
+
+            for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+                int grp = e.getKey();
+
+                clearDumpListener(cctx.cache().cacheGroup(grp));
+
+                for (Integer part : e.getValue())
+                    taken.add(new GroupPartitionId(grp, part));
+            }
+
+            closeFut = CompletableFuture.runAsync(
+                () -> {
+                    thLocBufs.clear();
+                    onDone(new SnapshotFutureTaskResult(taken, null), err0);
+                },
+                cctx.kernalContext().pools().getSystemExecutorService()
+            );
+        }
+
+        return closeFut;
+    }
+
+    /** */
+    private void clearDumpListener(CacheGroupContext gctx) {
+        for (GridCacheContext<?, ?> cctx : gctx.caches())
+            cctx.dumpListener(null);
+    }
+
+    /** */
+    private void createDumpLock() throws IgniteCheckedException, IOException {
+        File nodeDumpDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir, 
cctx);
+
+        if (!nodeDumpDir.mkdirs())
+            throw new IgniteCheckedException("Can't create node dump 
directory: " + nodeDumpDir.getAbsolutePath());
+
+        File lock = new File(nodeDumpDir, DUMP_LOCK);
+
+        if (!lock.createNewFile())
+            throw new IgniteCheckedException("Lock file can't be created or 
already exists: " + lock.getAbsolutePath());
+    }
+
+    /** */
+    private PartitionDumpContext dumpContext(int grp, int part) {
+        return dumpCtxs.computeIfAbsent(
+            toLong(grp, part),
+            key -> new 
PartitionDumpContext(cctx.kernalContext().cache().cacheGroup(grp), part, 
thLocBufs)
+        );
+    }
+
+    /**
+     * Context of dump single partition.
+     */
+    private class PartitionDumpContext implements Closeable {
+        /** Group id. */
+        final int grp;
+
+        /** Partition id. */
+        final int part;
+
+        /** Hashes of cache keys of entries changed by the user during 
partition dump. */
+        final Map<Integer, Set<KeyCacheObject>> changed;
+
+        /** Count of entries changed during dump creation. */
+        LongAdder changedCnt = new LongAdder();
+
+        /** Partition dump file. Lazily initialized to prevent creation files 
for empty partitions. */
+        final FileIO file;
+
+        /** Last version on time of dump start. Can be used only for primary. 
*/
+        @Nullable final GridCacheVersion startVer;
+
+        /** Last version on time of dump start. Can be used only for primary. 
*/

Review Comment:
   Good catch, thanks. Javadoc extended.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to