timoninmaxim commented on code in PR #10936:
URL: https://github.com/apache/ignite/pull/10936#discussion_r1328557286
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java:
##########
@@ -541,8 +541,12 @@ private boolean inMemoryCdcCache(CacheConfiguration<?, ?>
cfg) {
public File cacheConfigurationFile(CacheConfiguration<?, ?> ccfg) {
File cacheWorkDir = cacheWorkDir(ccfg);
- return ccfg.getGroupName() == null ? new File(cacheWorkDir,
CACHE_DATA_FILENAME) :
- new File(cacheWorkDir, ccfg.getName() + CACHE_DATA_FILENAME);
+ return new File(cacheWorkDir, cachDataFilename(ccfg));
+ }
+
+ /** @return Name of cache data filename. */
+ public static String cachDataFilename(CacheConfiguration<?, ?> ccfg) {
Review Comment:
misprint, `cach` -> `cache`
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -1394,13 +1408,17 @@ else if (!F.isEmpty(req.warnings())) {
storeWarnings(snpReq);
}
- removeLastMetaStorageKey();
+ if (!req.dump()) {
Review Comment:
It's better for reading do not use negative conditions. Instead use
```
if (req.dump()) {
}
else {
}
```
##########
modules/indexing/src/test/java/org/apache/ignite/internal/cdc/SqlCdcTest.java:
##########
@@ -376,7 +376,7 @@ else if (evt.configuration().getName().equals(USER)) {
}
/** */
- static List<List<?>> executeSql(IgniteEx node, String sqlText, Object...
args) {
+ public static List<List<?>> executeSql(IgniteEx node, String sqlText,
Object... args) {
Review Comment:
Let's not use methods from different components to reduce conflicts in the
future. Especially when it's a single liner.
##########
modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java:
##########
@@ -71,7 +72,8 @@ public IgniteCommandRegistry() {
new DefragmentationCommand(),
new PerformanceStatisticsCommand(),
new ConsistencyCommand(),
- new CdcCommand()
+ new CdcCommand(),
+ new DumpCommand()
Review Comment:
Let's remove the command from this PR
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractCreateSnapshotFutureTask;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static
org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
+
+/**
+ * Task creates cache group dump.
+ * Dump is a consistent snapshot of cache entries.
+ * Directories structure is same as a full snapshot but each partitions saved
in "part-0.dump" file.
+ * Files structure is a set of {@link DumpEntry} written one by one.
+ *
+ * @see Dump
+ * @see DumpEntry
+ */
+public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask
implements DumpEntryChangeListener {
+ /** Dump files name. */
+ public static final String DUMP_FILE_EXT = ".dump";
+
+ /** Root dump directory. */
+ private final File dumpDir;
+
+ /** */
+ private final FileIOFactory ioFactory;
+
+ /**
+ * Dump contextes.
+ * Key is [group_id, partition_id] combined in single long value.
+ *
+ * @see #toLong(int, int)
+ */
+ private final Map<Long, PartitionDumpContext> dumpCtxs = new
ConcurrentHashMap<>();
+
+ /** Local node is primary for set of group partitions. */
+ private final Map<Integer, Set<Integer>> grpPrimaries = new
ConcurrentHashMap();
+
+ /**
+ * @param cctx Cache context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param reqId Snapshot operation request ID.
+ * @param dumpName Dump name.
+ * @param ioFactory IO factory.
+ * @param snpSndr Snapshot sender.
+ * @param parts Parts to dump.
+ */
+ public CreateDumpFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ UUID reqId,
+ String dumpName,
+ File dumpDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ super(
+ cctx,
+ srcNodeId,
+ reqId,
+ dumpName,
+ snpSndr,
+ parts
+ );
+
+ this.dumpDir = dumpDir;
+ this.ioFactory = ioFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean start() {
+ try {
+ log.info("Start cache dump [name=" + snpName + ", grps=" +
parts.keySet() + ']');
+
+ createDumpLock();
+
+ processPartitions();
+
+ prepare();
+
+ saveSnapshotData();
+ }
+ catch (IgniteCheckedException | IOException e) {
+ acceptException(e);
+ }
+
+ return false; // Don't wait for checkpoint.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processPartitions() throws IgniteCheckedException
{
+ super.processPartitions();
+
+ processed.values().forEach(parts -> parts.remove(INDEX_PARTITION));
+ }
+
+ /** Prepares all data structures to dump entries. */
+ private void prepare() throws IOException, IgniteCheckedException {
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grp = e.getKey();
+
+ File grpDumpDir = groupDirectory(cctx.cache().cacheGroup(grp));
+
+ if (!grpDumpDir.mkdirs())
+ throw new IgniteCheckedException("Dump directory can't be
created: " + grpDumpDir);
+
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+ for (GridCacheContext<?, ?> cctx : gctx.caches())
+ cctx.dumpListener(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<CompletableFuture<Void>> saveCacheConfigsCopy() {
+ return parts.keySet().stream().map(grp -> runAsync(() -> {
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+ File grpDir = groupDirectory(gctx);
+
+ IgniteUtils.ensureDirectory(grpDir, "dump group directory", null);
+
+ for (GridCacheContext<?, ?> cacheCtx : gctx.caches()) {
+ DynamicCacheDescriptor desc =
cctx.kernalContext().cache().cacheDescriptor(cacheCtx.cacheId());
+
+ StoredCacheData cacheData = new StoredCacheData(new
CacheConfiguration(desc.cacheConfiguration()));
+
+ cacheData.queryEntities(desc.schema().entities());
+ cacheData.sql(desc.sql());
+
+ cctx.cache().configManager().writeCacheData(cacheData, new
File(grpDir, cachDataFilename(cacheData.config())));
+ }
+ })).collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<CompletableFuture<Void>> saveGroup(int grp,
Set<Integer> grpParts) {
+ long start = System.currentTimeMillis();
+
+ AtomicLong entriesCnt = new AtomicLong();
+ AtomicLong changedEntriesCnt = new AtomicLong();
+ AtomicLong partsRemain = new AtomicLong(grpParts.size());
+
+ String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();
+
+ CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grp);
+
+ log.info("Start group dump [name=" + name + ", id=" + grp + ']');
+
+ return grpParts.stream().map(part -> runAsync(() -> {
+ long entriesCnt0 = 0;
+
+ try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
+ try (GridCloseableIterator<CacheDataRow> rows =
gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
+ if (rows == null)
+ throw new IgniteCheckedException("Partition missing
[part=" + part + ']');
+
+ while (rows.hasNext()) {
+ CacheDataRow row = rows.next();
+
+ assert row.partition() == part;
+
+ int cache = row.cacheId() == 0 ? grp : row.cacheId();
+
+ if (dumpCtx.writeForIterator(cache, row.expireTime(),
row.key(), row.value(), row.version()))
+ entriesCnt0++;
+ }
+ }
+
+ entriesCnt.addAndGet(entriesCnt0);
+ changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());
+
+ long remain = partsRemain.decrementAndGet();
+
+ if (remain == 0) {
+ clearDumpListener(gctx);
+
+ log.info("Finish group dump [name=" + name +
+ ", id=" + grp +
+ ", time=" + (System.currentTimeMillis() - start) +
+ ", iteratorEntriesCount=" + entriesCnt +
+ ", changedEntriesCount=" + changedEntriesCnt + ']');
+ }
+ else if (log.isDebugEnabled()) {
+ log.debug("Finish group partition dump [name=" + name +
+ ", id=" + grp +
+ ", part=" + part +
+ ", time=" + (System.currentTimeMillis() - start) +
+ ", iteratorEntriesCount=" + entriesCnt +
+ ", changedEntriesCount=" + changedEntriesCnt + ']');
+
+ }
+ }
+ })).collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeChange(GridCacheContext cctx, KeyCacheObject
key, CacheObject val, long expireTime, GridCacheVersion ver) {
+ try {
+ assert key.partition() != -1;
+
+ dumpContext(cctx.groupId(),
key.partition()).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
+ }
+ catch (IgniteException e) {
+ acceptException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CompletableFuture<Void> closeAsync() {
+ if (closeFut == null) {
+ dumpCtxs.values().forEach(PartitionDumpContext::close);
+
+ Throwable err0 = err.get();
+
+ Set<GroupPartitionId> taken = new HashSet<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grp = e.getKey();
+
+ clearDumpListener(cctx.cache().cacheGroup(grp));
+
+ for (Integer part : e.getValue())
+ taken.add(new GroupPartitionId(grp, part));
+ }
+
+ closeFut = CompletableFuture.runAsync(
+ () -> onDone(new SnapshotFutureTaskResult(taken, null), err0),
+ cctx.kernalContext().pools().getSystemExecutorService()
+ );
+ }
+
+ return closeFut;
+ }
+
+ /** */
+ private void clearDumpListener(CacheGroupContext gctx) {
+ for (GridCacheContext<?, ?> cctx : gctx.caches())
+ cctx.dumpListener(null);
+ }
+
+ /** */
+ private void createDumpLock() throws IgniteCheckedException, IOException {
+ File nodeDumpDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir,
cctx);
+
+ if (!nodeDumpDir.mkdirs())
+ throw new IgniteCheckedException("Can't create node dump
directory: " + nodeDumpDir.getAbsolutePath());
+
+ File lock = new File(nodeDumpDir, DUMP_LOCK);
+
+ if (!lock.createNewFile())
+ throw new IgniteCheckedException("Lock file can't be created or
already exists: " + lock.getAbsolutePath());
+ }
+
+ /** */
+ private PartitionDumpContext dumpContext(int grp, int part) {
+ return dumpCtxs.computeIfAbsent(
+ toLong(grp, part),
+ key -> {
+ try {
+ return new PartitionDumpContext(
+ cctx.kernalContext().cache().cacheGroup(grp),
+ part,
+ new File(groupDirectory(cctx.cache().cacheGroup(grp)),
PART_FILE_PREFIX + part + DUMP_FILE_EXT)
+ );
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ );
+ }
+
+ /**
+ * Context of dump single partition.
+ */
+ private class PartitionDumpContext implements Closeable {
+ /** Group id. */
+ final int grp;
+
+ /** Partition id. */
+ final int part;
+
+ /** Hashes of cache keys of entries changed by the user during
partition dump. */
+ final Map<Integer, Set<KeyCacheObject>> changed;
+
+ /** Count of entries changed during dump creation. */
+ LongAdder changedCnt = new LongAdder();
+
+ /** Partition dump file. Lazily initialized to prevent creation files
for empty partitions. */
+ final FileIO file;
+
+ /** Last version on time of dump start. Can be used only for primary.
*/
+ @Nullable final GridCacheVersion startVer;
+
+ /** Topology Version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Partition serializer. */
+ DumpEntrySerializer serdes;
+
+ /** If {@code true} context is closed. */
+ volatile boolean closed;
+
+ /** Count of writers. When count becomes {@code 0} context must be
closed. */
+ private final AtomicInteger writers = new AtomicInteger(1); //
Iterator writing entries to this context, by default.
+
+ /**
+ * @param gctx Group id.
+ * @param part Partition id.
+ * @param dumpFile Dump file path.
+ */
+ public PartitionDumpContext(CacheGroupContext gctx, int part, File
dumpFile) throws IOException {
+ assert gctx != null;
+
+ this.part = part;
+ grp = gctx.groupId();
+ topVer = gctx.topology().lastTopologyChangeVersion();
+
+ boolean primary = grpPrimaries.computeIfAbsent(
Review Comment:
Let's calculate in CreateDumpFutureTask constructor in the `disco-notifier`
thread. Instead of creating a bottle-neck during active `beforeChange`
invokations.
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/AbstractCacheDumpTest.java:
##########
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.IntConsumer;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+import javax.management.DynamicMBean;
+import javax.management.MBeanException;
+import javax.management.ReflectionException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.Dump.DumpedPartitionIterator;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.platform.model.ACL;
+import org.apache.ignite.platform.model.Key;
+import org.apache.ignite.platform.model.Role;
+import org.apache.ignite.platform.model.User;
+import org.apache.ignite.platform.model.Value;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.management.api.CommandMBean.INVOKE;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_RUNNING_DIR_KEY;
+import static org.apache.ignite.platform.model.AccessLevel.SUPER;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public abstract class AbstractCacheDumpTest extends GridCommonAbstractTest {
+ /** */
+ public static final String GRP = "grp";
+
+ /** */
+ public static final String CACHE_0 = "cache-0";
+
+ /** */
+ public static final String CACHE_1 = "cache-1";
+
+ /** */
+ public static final int KEYS_CNT = 1000;
+
+ /** */
+ public static final String DMP_NAME = "dump";
+
+ /** */
+ protected static final IntFunction<User> USER_FACTORY = i ->
+ new User(i, ACL.values()[Math.abs(i) % ACL.values().length], new
Role("Role" + i, SUPER));
+
+ /** */
+ @Parameterized.Parameter
+ public int nodes;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public int backups;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public boolean persistence;
+
+ /** */
+ @Parameterized.Parameter(3)
+ public CacheAtomicityMode mode;
+
+ /** */
+ @Parameterized.Parameters(name =
"nodes={0},backups={1},persistence={2},mode={3}")
+ public static List<Object[]> params() {
+ List<Object[]> params = new ArrayList<>();
+
+ for (int nodes : new int[]{1, 3})
+ for (int backups : new int[]{0, 1})
+ for (boolean persistence : new boolean[]{true, false})
+ for (CacheAtomicityMode mode :
CacheAtomicityMode._values()) {
+ if (nodes == 1 && backups != 0)
+ continue;
+
+ params.add(new Object[]{nodes, backups, persistence,
mode});
+ }
+
+ return params;
+ }
+
+ /** */
+ protected int snpPoolSz = 1;
+
+ /** */
+ protected IgniteEx cli;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setSnapshotThreadPoolSize(snpPoolSz)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setPersistenceEnabled(persistence)))
+ .setCacheConfiguration(
+ new CacheConfiguration<>()
+ .setName(DEFAULT_CACHE_NAME)
+ .setBackups(backups)
+ .setAtomicityMode(mode)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setAffinity(new
RendezvousAffinityFunction().setPartitions(20)),
+ new CacheConfiguration<>()
+ .setGroupName(GRP)
+ .setName(CACHE_0)
+ .setBackups(backups)
+ .setAtomicityMode(mode)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setAffinity(new
RendezvousAffinityFunction().setPartitions(20)),
+ new CacheConfiguration<>()
+ .setGroupName(GRP)
+ .setName(CACHE_1)
+ .setBackups(backups)
+ .setAtomicityMode(mode)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setAffinity(new
RendezvousAffinityFunction().setPartitions(20))
+ );
+ }
+
+ /** */
+ protected IgniteEx startGridAndFillCaches() throws Exception {
+ IgniteEx ign = (IgniteEx)startGridsMultiThreaded(nodes);
+
+ cli = startClientGrid(nodes);
+
+ ign.cluster().state(ClusterState.ACTIVE);
+
+ putData(ign.cache(DEFAULT_CACHE_NAME), ign.cache(CACHE_0),
ign.cache(CACHE_1));
+
+ return ign;
+ }
+
+ /** */
+ protected T2<CountDownLatch, IgniteInternalFuture<?>>
runDumpAsyncAndStopBeforeStart() throws IgniteInterruptedCheckedException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ List<Ignite> ignites = Ignition.allGrids();
+
+ for (Ignite ign : ignites) {
+
((IgniteEx)ign).context().pools().getSnapshotExecutorService().submit(() -> {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ });
+ }
+
+ IgniteInternalFuture<Object> dumpFut = runAsync(() ->
createDump((IgniteEx)ignites.get(0)));
+
+ // Waiting while dump will be setup: task planned after change
listener set.
+ assertTrue(waitForCondition(() -> {
+ for (Ignite ign : ignites) {
+ if (ign.configuration().isClientMode() == Boolean.TRUE)
+ continue;
+
+ if
(((ThreadPoolExecutor)((IgniteEx)ign).context().pools().getSnapshotExecutorService()).getTaskCount()
<= 1)
+ return false;
+ }
+
+ return true;
+ }, 10 * 1000));
+
+ return new T2<>(latch, dumpFut);
+ }
+
+ /** */
+ protected void putData(
+ IgniteCache<Object, Object> cache,
+ IgniteCache<Object, Object> grpCache0,
+ IgniteCache<Object, Object> grpCache1
+ ) {
+ IntStream.range(0, KEYS_CNT).forEach(i -> {
+ cache.put(i, i);
+ grpCache0.put(i, USER_FACTORY.apply(i));
+ grpCache1.put(new Key(i), new Value(String.valueOf(i)));
+ });
+ }
+
+ /** */
+ void checkDump(IgniteEx ign) throws Exception {
+ checkDump(ign, DMP_NAME);
+ }
+
+ /** */
+ void checkDump(IgniteEx ign, String name) throws Exception {
+ if (persistence)
+
assertNull(ign.context().cache().context().database().metaStorage().read(SNP_RUNNING_DIR_KEY));
+
+ Dump dump = dump(ign, name);
+
+ List<SnapshotMetadata> metadata = dump.metadata();
+
+ assertNotNull(metadata);
+ assertEquals(nodes, metadata.size());
+
+ for (SnapshotMetadata meta : metadata) {
+ assertEquals(name, meta.snapshotName());
+ assertTrue(meta.dump());
+ }
+
+ List<String> nodesDirs = dump.nodesDirectories();
+
+ assertEquals(nodes, nodesDirs.size());
+
+ Set<Integer> keys = new HashSet<>();
+ int dfltDumpSz = 0;
+ int grpDumpSz = 0;
+
+ CacheObjectContext coCtx =
ign.context().cache().context().cacheObjectContext(CU.cacheId(DEFAULT_CACHE_NAME));
+ CacheObjectContext coCtx0 =
ign.context().cache().context().cacheObjectContext(CU.cacheId(CACHE_0));
+ CacheObjectContext coCtx1 =
ign.context().cache().context().cacheObjectContext(CU.cacheId(CACHE_1));
+
+ for (String nodeDir : nodesDirs) {
+ List<StoredCacheData> ccfgs = dump.configs(nodeDir,
CU.cacheId(DEFAULT_CACHE_NAME));
+
+ assertNotNull(ccfgs);
+ assertEquals(1, ccfgs.size());
+
+ assertEquals(DEFAULT_CACHE_NAME,
ccfgs.get(0).configuration().getName());
+ assertFalse(ccfgs.get(0).sql());
+ assertTrue(ccfgs.get(0).queryEntities().isEmpty());
+
+ ccfgs = dump.configs(nodeDir, CU.cacheId(GRP));
+
+ assertNotNull(ccfgs);
+ assertEquals(2, ccfgs.size());
+
+ ccfgs.sort(Comparator.comparing(d -> d.config().getName()));
+
+ CacheConfiguration ccfg0 = ccfgs.get(0).configuration();
+ CacheConfiguration ccfg1 = ccfgs.get(1).configuration();
+
+ assertEquals(GRP, ccfg0.getGroupName());
+ assertEquals(CACHE_0, ccfg0.getName());
+
+ assertEquals(GRP, ccfg1.getGroupName());
+ assertEquals(CACHE_1, ccfg1.getName());
+
+ assertFalse(ccfgs.get(0).sql());
+ assertFalse(ccfgs.get(1).sql());
+ assertTrue(ccfgs.get(0).queryEntities().isEmpty());
+ assertTrue(ccfgs.get(1).queryEntities().isEmpty());
+
+ List<Integer> parts = dump.partitions(nodeDir,
CU.cacheId(DEFAULT_CACHE_NAME));
+
+ for (int part : parts) {
+ try (DumpedPartitionIterator iter = dump.iterator(nodeDir,
CU.cacheId(DEFAULT_CACHE_NAME), part)) {
+ while (iter.hasNext()) {
+ DumpEntry e = iter.next();
+
+ checkDefaultCacheEntry(e, coCtx);
+
+ keys.add(e.key().<Integer>value(coCtx, true));
+
+ dfltDumpSz++;
+ }
+ }
+ }
+
+ parts = dump.partitions(nodeDir, CU.cacheId(GRP));
+
+ for (int part : parts) {
+ try (DumpedPartitionIterator iter = dump.iterator(nodeDir,
CU.cacheId(GRP), part)) {
+ while (iter.hasNext()) {
+ DumpEntry e = iter.next();
+
+ checkGroupEntry(e, coCtx0, coCtx1);
+
+ grpDumpSz++;
+ }
+ }
+ }
+ }
+
+ assertEquals(KEYS_CNT + KEYS_CNT * backups, dfltDumpSz);
+ assertEquals(2 * (KEYS_CNT + KEYS_CNT * backups), grpDumpSz);
+
+ IntStream.range(0, KEYS_CNT).forEach(key ->
assertTrue(keys.contains(key)));
+ }
+
+ /** */
+ protected void checkDefaultCacheEntry(DumpEntry e, CacheObjectContext
coCtx) {
+ assertNotNull(e);
+
+ Integer key = e.key().<Integer>value(coCtx, true);
+
+ assertEquals(key, e.value().<Integer>value(coCtx, true));
+ }
+
+ /** */
+ protected void checkGroupEntry(DumpEntry e, CacheObjectContext coCtx0,
CacheObjectContext coCtx1) {
+ assertNotNull(e);
+
+ if (e.cacheId() == CU.cacheId(CACHE_0))
+ assertEquals(USER_FACTORY.apply(e.key().value(coCtx0, true)),
e.value().value(coCtx0, true));
+ else {
+ assertNotNull(e.key().<Key>value(coCtx1, true));
+ assertNotNull(e.value().<Value>value(coCtx1, true));
+ }
+ }
+
+ /** */
+ protected void insertOrUpdate(IgniteEx ignite, int i) {
+ insertOrUpdate(ignite, i, i);
+ }
+
+ /** */
+ protected void insertOrUpdate(IgniteEx ignite, int i, int val) {
+ ignite.cache(DEFAULT_CACHE_NAME).put(i, val);
+ IgniteCache<Object, Object> cache = ignite.cache(CACHE_0);
+ IgniteCache<Object, Object> cache1 = ignite.cache(CACHE_1);
+
+ if (mode == CacheAtomicityMode.TRANSACTIONAL) {
+ try (Transaction tx = ignite.transactions().txStart()) {
Review Comment:
No need explicitly start transaction for single put. It automatically starts
for every single put/delete/etc operations.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractCreateSnapshotFutureTask;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static
org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
+
+/**
+ * Task creates cache group dump.
+ * Dump is a consistent snapshot of cache entries.
+ * Directories structure is same as a full snapshot but each partitions saved
in "part-0.dump" file.
+ * Files structure is a set of {@link DumpEntry} written one by one.
+ *
+ * @see Dump
+ * @see DumpEntry
+ */
+public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask
implements DumpEntryChangeListener {
+ /** Dump files name. */
+ public static final String DUMP_FILE_EXT = ".dump";
+
+ /** Root dump directory. */
+ private final File dumpDir;
+
+ /** */
+ private final FileIOFactory ioFactory;
+
+ /**
+ * Dump contextes.
+ * Key is [group_id, partition_id] combined in single long value.
+ *
+ * @see #toLong(int, int)
+ */
+ private final Map<Long, PartitionDumpContext> dumpCtxs = new
ConcurrentHashMap<>();
+
+ /** Local node is primary for set of group partitions. */
+ private final Map<Integer, Set<Integer>> grpPrimaries = new
ConcurrentHashMap();
Review Comment:
ConcurrentHashMap**<>**();
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java:
##########
@@ -228,6 +234,11 @@ public boolean onlyPrimary() {
return onlyPrimary;
}
+ /** @return If {@code true} then metadata describe cache dump. */
Review Comment:
`describe` -> `describes`
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractCreateSnapshotFutureTask;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static
org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
+
+/**
+ * Task creates cache group dump.
+ * Dump is a consistent snapshot of cache entries.
+ * Directories structure is same as a full snapshot but each partitions saved
in "part-0.dump" file.
+ * Files structure is a set of {@link DumpEntry} written one by one.
+ *
+ * @see Dump
+ * @see DumpEntry
+ */
+public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask
implements DumpEntryChangeListener {
+ /** Dump files name. */
+ public static final String DUMP_FILE_EXT = ".dump";
+
+ /** Root dump directory. */
+ private final File dumpDir;
+
+ /** */
+ private final FileIOFactory ioFactory;
+
+ /**
+ * Dump contextes.
+ * Key is [group_id, partition_id] combined in single long value.
+ *
+ * @see #toLong(int, int)
+ */
+ private final Map<Long, PartitionDumpContext> dumpCtxs = new
ConcurrentHashMap<>();
+
+ /** Local node is primary for set of group partitions. */
+ private final Map<Integer, Set<Integer>> grpPrimaries = new
ConcurrentHashMap();
+
+ /**
+ * @param cctx Cache context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param reqId Snapshot operation request ID.
+ * @param dumpName Dump name.
+ * @param ioFactory IO factory.
+ * @param snpSndr Snapshot sender.
+ * @param parts Parts to dump.
+ */
+ public CreateDumpFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ UUID reqId,
+ String dumpName,
+ File dumpDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ super(
+ cctx,
+ srcNodeId,
+ reqId,
+ dumpName,
+ snpSndr,
+ parts
+ );
+
+ this.dumpDir = dumpDir;
+ this.ioFactory = ioFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean start() {
+ try {
+ log.info("Start cache dump [name=" + snpName + ", grps=" +
parts.keySet() + ']');
+
+ createDumpLock();
+
+ processPartitions();
+
+ prepare();
+
+ saveSnapshotData();
+ }
+ catch (IgniteCheckedException | IOException e) {
+ acceptException(e);
+ }
+
+ return false; // Don't wait for checkpoint.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processPartitions() throws IgniteCheckedException
{
+ super.processPartitions();
+
+ processed.values().forEach(parts -> parts.remove(INDEX_PARTITION));
+ }
+
+ /** Prepares all data structures to dump entries. */
+ private void prepare() throws IOException, IgniteCheckedException {
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grp = e.getKey();
+
+ File grpDumpDir = groupDirectory(cctx.cache().cacheGroup(grp));
+
+ if (!grpDumpDir.mkdirs())
+ throw new IgniteCheckedException("Dump directory can't be
created: " + grpDumpDir);
+
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+ for (GridCacheContext<?, ?> cctx : gctx.caches())
+ cctx.dumpListener(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<CompletableFuture<Void>> saveCacheConfigsCopy() {
+ return parts.keySet().stream().map(grp -> runAsync(() -> {
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+ File grpDir = groupDirectory(gctx);
+
+ IgniteUtils.ensureDirectory(grpDir, "dump group directory", null);
+
+ for (GridCacheContext<?, ?> cacheCtx : gctx.caches()) {
+ DynamicCacheDescriptor desc =
cctx.kernalContext().cache().cacheDescriptor(cacheCtx.cacheId());
+
+ StoredCacheData cacheData = new StoredCacheData(new
CacheConfiguration(desc.cacheConfiguration()));
+
+ cacheData.queryEntities(desc.schema().entities());
+ cacheData.sql(desc.sql());
+
+ cctx.cache().configManager().writeCacheData(cacheData, new
File(grpDir, cachDataFilename(cacheData.config())));
+ }
+ })).collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<CompletableFuture<Void>> saveGroup(int grp,
Set<Integer> grpParts) {
+ long start = System.currentTimeMillis();
+
+ AtomicLong entriesCnt = new AtomicLong();
+ AtomicLong changedEntriesCnt = new AtomicLong();
+ AtomicLong partsRemain = new AtomicLong(grpParts.size());
+
+ String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();
+
+ CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grp);
+
+ log.info("Start group dump [name=" + name + ", id=" + grp + ']');
+
+ return grpParts.stream().map(part -> runAsync(() -> {
+ long entriesCnt0 = 0;
+
+ try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
+ try (GridCloseableIterator<CacheDataRow> rows =
gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
+ if (rows == null)
+ throw new IgniteCheckedException("Partition missing
[part=" + part + ']');
+
+ while (rows.hasNext()) {
+ CacheDataRow row = rows.next();
+
+ assert row.partition() == part;
+
+ int cache = row.cacheId() == 0 ? grp : row.cacheId();
+
+ if (dumpCtx.writeForIterator(cache, row.expireTime(),
row.key(), row.value(), row.version()))
+ entriesCnt0++;
+ }
+ }
+
+ entriesCnt.addAndGet(entriesCnt0);
+ changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());
+
+ long remain = partsRemain.decrementAndGet();
+
+ if (remain == 0) {
+ clearDumpListener(gctx);
+
+ log.info("Finish group dump [name=" + name +
+ ", id=" + grp +
+ ", time=" + (System.currentTimeMillis() - start) +
+ ", iteratorEntriesCount=" + entriesCnt +
+ ", changedEntriesCount=" + changedEntriesCnt + ']');
+ }
+ else if (log.isDebugEnabled()) {
+ log.debug("Finish group partition dump [name=" + name +
+ ", id=" + grp +
+ ", part=" + part +
+ ", time=" + (System.currentTimeMillis() - start) +
+ ", iteratorEntriesCount=" + entriesCnt +
+ ", changedEntriesCount=" + changedEntriesCnt + ']');
+
+ }
+ }
+ })).collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeChange(GridCacheContext cctx, KeyCacheObject
key, CacheObject val, long expireTime, GridCacheVersion ver) {
+ try {
+ assert key.partition() != -1;
+
+ dumpContext(cctx.groupId(),
key.partition()).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
+ }
+ catch (IgniteException e) {
+ acceptException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CompletableFuture<Void> closeAsync() {
+ if (closeFut == null) {
+ dumpCtxs.values().forEach(PartitionDumpContext::close);
+
+ Throwable err0 = err.get();
+
+ Set<GroupPartitionId> taken = new HashSet<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grp = e.getKey();
+
+ clearDumpListener(cctx.cache().cacheGroup(grp));
+
+ for (Integer part : e.getValue())
+ taken.add(new GroupPartitionId(grp, part));
+ }
+
+ closeFut = CompletableFuture.runAsync(
+ () -> onDone(new SnapshotFutureTaskResult(taken, null), err0),
+ cctx.kernalContext().pools().getSystemExecutorService()
+ );
+ }
+
+ return closeFut;
+ }
+
+ /** */
+ private void clearDumpListener(CacheGroupContext gctx) {
+ for (GridCacheContext<?, ?> cctx : gctx.caches())
+ cctx.dumpListener(null);
+ }
+
+ /** */
+ private void createDumpLock() throws IgniteCheckedException, IOException {
+ File nodeDumpDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir,
cctx);
+
+ if (!nodeDumpDir.mkdirs())
+ throw new IgniteCheckedException("Can't create node dump
directory: " + nodeDumpDir.getAbsolutePath());
+
+ File lock = new File(nodeDumpDir, DUMP_LOCK);
+
+ if (!lock.createNewFile())
+ throw new IgniteCheckedException("Lock file can't be created or
already exists: " + lock.getAbsolutePath());
+ }
+
+ /** */
+ private PartitionDumpContext dumpContext(int grp, int part) {
+ return dumpCtxs.computeIfAbsent(
+ toLong(grp, part),
+ key -> {
+ try {
+ return new PartitionDumpContext(
+ cctx.kernalContext().cache().cacheGroup(grp),
+ part,
+ new File(groupDirectory(cctx.cache().cacheGroup(grp)),
PART_FILE_PREFIX + part + DUMP_FILE_EXT)
Review Comment:
In this line and on 324 line the same CacheGroupContext is used. Looks like
can create file inside PartitionDumpContext constructor.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java:
##########
@@ -2348,6 +2353,16 @@ public AtomicReference<IgniteInternalFuture<Boolean>>
lastRemoveAllJobFut() {
return lastRmvAllJobFut;
}
+ /** */
+ public DumpEntryChangeListener dumpListener() {
+ return dumpLsnr;
+ }
+
+ /** */
+ public void dumpListener(DumpEntryChangeListener dumpEntryChangeLsnr) {
+ this.dumpLsnr = dumpEntryChangeLsnr;
Review Comment:
Let's add assert that the current `dumpLsnr` is null.
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteCacheDumpSelfTest.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.platform.model.Key;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_RUNNING_DIR_KEY;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.CreateDumpFutureTask.DUMP_FILE_EXT;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/** */
+public class IgniteCacheDumpSelfTest extends AbstractCacheDumpTest {
Review Comment:
Need test:
1. Writing to ignite-sys-cache (must be skipped) is filtered out.
2. Concurrent dump creation isn't allowed.
3. Changing data in EntryProcessor is reflected.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractCreateSnapshotFutureTask.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.util.lang.IgniteThrowableRunner;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+
+/**
+ *
+ */
+public abstract class AbstractCreateSnapshotFutureTask extends
AbstractSnapshotFutureTask<SnapshotFutureTaskResult> {
+ /**
+ * Cache group and corresponding partitions collected under the PME lock.
+ * For full snapshot additional checkpoint write lock required.
+ * @see
SnapshotFutureTask#onMarkCheckpointBegin(CheckpointListener.Context)
+ */
+ protected final Map<Integer, Set<Integer>> processed = new HashMap<>();
+
+ /** Future which will be completed when task requested to be closed. Will
be executed on system pool. */
+ protected volatile CompletableFuture<Void> closeFut;
+
+ /**
+ * @param cctx Shared context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param reqId Snapshot operation request ID.
+ * @param snpName Snapshot name.
+ * @param snpSndr Factory which produces snapshot sender instance.
+ * @param parts Partitions to be processed.
+ */
+ protected AbstractCreateSnapshotFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ UUID reqId,
+ String snpName,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ super(cctx, srcNodeId, reqId, snpName, snpSndr, parts);
+ }
+
+ /** */
+ protected abstract List<CompletableFuture<Void>> saveCacheConfigsCopy();
Review Comment:
Let's rename `saveCacheConfigsCopy` -> `saveCacheConfigs`.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractCreateSnapshotFutureTask;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFutureTaskResult;
+import
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static
org.apache.ignite.internal.processors.cache.GridLocalConfigManager.cachDataFilename;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DUMP_LOCK;
+
+/**
+ * Task creates cache group dump.
+ * Dump is a consistent snapshot of cache entries.
+ * Directories structure is same as a full snapshot but each partitions saved
in "part-0.dump" file.
+ * Files structure is a set of {@link DumpEntry} written one by one.
+ *
+ * @see Dump
+ * @see DumpEntry
+ */
+public class CreateDumpFutureTask extends AbstractCreateSnapshotFutureTask
implements DumpEntryChangeListener {
+ /** Dump files name. */
+ public static final String DUMP_FILE_EXT = ".dump";
+
+ /** Root dump directory. */
+ private final File dumpDir;
+
+ /** */
+ private final FileIOFactory ioFactory;
+
+ /**
+ * Dump contextes.
+ * Key is [group_id, partition_id] combined in single long value.
+ *
+ * @see #toLong(int, int)
+ */
+ private final Map<Long, PartitionDumpContext> dumpCtxs = new
ConcurrentHashMap<>();
+
+ /** Local node is primary for set of group partitions. */
+ private final Map<Integer, Set<Integer>> grpPrimaries = new
ConcurrentHashMap();
+
+ /**
+ * @param cctx Cache context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param reqId Snapshot operation request ID.
+ * @param dumpName Dump name.
+ * @param ioFactory IO factory.
+ * @param snpSndr Snapshot sender.
+ * @param parts Parts to dump.
+ */
+ public CreateDumpFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ UUID reqId,
+ String dumpName,
+ File dumpDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ super(
+ cctx,
+ srcNodeId,
+ reqId,
+ dumpName,
+ snpSndr,
+ parts
+ );
+
+ this.dumpDir = dumpDir;
+ this.ioFactory = ioFactory;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean start() {
+ try {
+ log.info("Start cache dump [name=" + snpName + ", grps=" +
parts.keySet() + ']');
+
+ createDumpLock();
+
+ processPartitions();
+
+ prepare();
+
+ saveSnapshotData();
+ }
+ catch (IgniteCheckedException | IOException e) {
+ acceptException(e);
+ }
+
+ return false; // Don't wait for checkpoint.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processPartitions() throws IgniteCheckedException
{
+ super.processPartitions();
+
+ processed.values().forEach(parts -> parts.remove(INDEX_PARTITION));
+ }
+
+ /** Prepares all data structures to dump entries. */
+ private void prepare() throws IOException, IgniteCheckedException {
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grp = e.getKey();
+
+ File grpDumpDir = groupDirectory(cctx.cache().cacheGroup(grp));
+
+ if (!grpDumpDir.mkdirs())
+ throw new IgniteCheckedException("Dump directory can't be
created: " + grpDumpDir);
+
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+ for (GridCacheContext<?, ?> cctx : gctx.caches())
+ cctx.dumpListener(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<CompletableFuture<Void>> saveCacheConfigsCopy() {
+ return parts.keySet().stream().map(grp -> runAsync(() -> {
+ CacheGroupContext gctx = cctx.cache().cacheGroup(grp);
+
+ File grpDir = groupDirectory(gctx);
+
+ IgniteUtils.ensureDirectory(grpDir, "dump group directory", null);
+
+ for (GridCacheContext<?, ?> cacheCtx : gctx.caches()) {
+ DynamicCacheDescriptor desc =
cctx.kernalContext().cache().cacheDescriptor(cacheCtx.cacheId());
+
+ StoredCacheData cacheData = new StoredCacheData(new
CacheConfiguration(desc.cacheConfiguration()));
+
+ cacheData.queryEntities(desc.schema().entities());
+ cacheData.sql(desc.sql());
+
+ cctx.cache().configManager().writeCacheData(cacheData, new
File(grpDir, cachDataFilename(cacheData.config())));
+ }
+ })).collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<CompletableFuture<Void>> saveGroup(int grp,
Set<Integer> grpParts) {
+ long start = System.currentTimeMillis();
+
+ AtomicLong entriesCnt = new AtomicLong();
+ AtomicLong changedEntriesCnt = new AtomicLong();
+ AtomicLong partsRemain = new AtomicLong(grpParts.size());
+
+ String name = cctx.cache().cacheGroup(grp).cacheOrGroupName();
+
+ CacheGroupContext gctx = cctx.kernalContext().cache().cacheGroup(grp);
+
+ log.info("Start group dump [name=" + name + ", id=" + grp + ']');
+
+ return grpParts.stream().map(part -> runAsync(() -> {
+ long entriesCnt0 = 0;
+
+ try (PartitionDumpContext dumpCtx = dumpContext(grp, part)) {
+ try (GridCloseableIterator<CacheDataRow> rows =
gctx.offheap().reservedIterator(part, dumpCtx.topVer)) {
+ if (rows == null)
+ throw new IgniteCheckedException("Partition missing
[part=" + part + ']');
+
+ while (rows.hasNext()) {
+ CacheDataRow row = rows.next();
+
+ assert row.partition() == part;
+
+ int cache = row.cacheId() == 0 ? grp : row.cacheId();
+
+ if (dumpCtx.writeForIterator(cache, row.expireTime(),
row.key(), row.value(), row.version()))
+ entriesCnt0++;
+ }
+ }
+
+ entriesCnt.addAndGet(entriesCnt0);
+ changedEntriesCnt.addAndGet(dumpCtx.changedCnt.intValue());
+
+ long remain = partsRemain.decrementAndGet();
+
+ if (remain == 0) {
+ clearDumpListener(gctx);
+
+ log.info("Finish group dump [name=" + name +
+ ", id=" + grp +
+ ", time=" + (System.currentTimeMillis() - start) +
+ ", iteratorEntriesCount=" + entriesCnt +
+ ", changedEntriesCount=" + changedEntriesCnt + ']');
+ }
+ else if (log.isDebugEnabled()) {
+ log.debug("Finish group partition dump [name=" + name +
+ ", id=" + grp +
+ ", part=" + part +
+ ", time=" + (System.currentTimeMillis() - start) +
+ ", iteratorEntriesCount=" + entriesCnt +
+ ", changedEntriesCount=" + changedEntriesCnt + ']');
+
+ }
+ }
+ })).collect(Collectors.toList());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void beforeChange(GridCacheContext cctx, KeyCacheObject
key, CacheObject val, long expireTime, GridCacheVersion ver) {
+ try {
+ assert key.partition() != -1;
+
+ dumpContext(cctx.groupId(),
key.partition()).writeChanged(cctx.cacheId(), expireTime, key, val, ver);
+ }
+ catch (IgniteException e) {
+ acceptException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CompletableFuture<Void> closeAsync() {
+ if (closeFut == null) {
+ dumpCtxs.values().forEach(PartitionDumpContext::close);
+
+ Throwable err0 = err.get();
+
+ Set<GroupPartitionId> taken = new HashSet<>();
+
+ for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) {
+ int grp = e.getKey();
+
+ clearDumpListener(cctx.cache().cacheGroup(grp));
+
+ for (Integer part : e.getValue())
+ taken.add(new GroupPartitionId(grp, part));
+ }
+
+ closeFut = CompletableFuture.runAsync(
+ () -> onDone(new SnapshotFutureTaskResult(taken, null), err0),
+ cctx.kernalContext().pools().getSystemExecutorService()
+ );
+ }
+
+ return closeFut;
+ }
+
+ /** */
+ private void clearDumpListener(CacheGroupContext gctx) {
+ for (GridCacheContext<?, ?> cctx : gctx.caches())
+ cctx.dumpListener(null);
+ }
+
+ /** */
+ private void createDumpLock() throws IgniteCheckedException, IOException {
+ File nodeDumpDir = IgniteSnapshotManager.nodeDumpDirectory(dumpDir,
cctx);
+
+ if (!nodeDumpDir.mkdirs())
+ throw new IgniteCheckedException("Can't create node dump
directory: " + nodeDumpDir.getAbsolutePath());
+
+ File lock = new File(nodeDumpDir, DUMP_LOCK);
+
+ if (!lock.createNewFile())
+ throw new IgniteCheckedException("Lock file can't be created or
already exists: " + lock.getAbsolutePath());
+ }
+
+ /** */
+ private PartitionDumpContext dumpContext(int grp, int part) {
+ return dumpCtxs.computeIfAbsent(
+ toLong(grp, part),
+ key -> {
+ try {
+ return new PartitionDumpContext(
+ cctx.kernalContext().cache().cacheGroup(grp),
+ part,
+ new File(groupDirectory(cctx.cache().cacheGroup(grp)),
PART_FILE_PREFIX + part + DUMP_FILE_EXT)
+ );
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ );
+ }
+
+ /**
+ * Context of dump single partition.
+ */
+ private class PartitionDumpContext implements Closeable {
+ /** Group id. */
+ final int grp;
+
+ /** Partition id. */
+ final int part;
+
+ /** Hashes of cache keys of entries changed by the user during
partition dump. */
+ final Map<Integer, Set<KeyCacheObject>> changed;
+
+ /** Count of entries changed during dump creation. */
+ LongAdder changedCnt = new LongAdder();
+
+ /** Partition dump file. Lazily initialized to prevent creation files
for empty partitions. */
+ final FileIO file;
+
+ /** Last version on time of dump start. Can be used only for primary.
*/
+ @Nullable final GridCacheVersion startVer;
+
+ /** Topology Version. */
+ private final AffinityTopologyVersion topVer;
+
+ /** Partition serializer. */
+ DumpEntrySerializer serdes;
+
+ /** If {@code true} context is closed. */
+ volatile boolean closed;
+
+ /** Count of writers. When count becomes {@code 0} context must be
closed. */
+ private final AtomicInteger writers = new AtomicInteger(1); //
Iterator writing entries to this context, by default.
+
+ /**
+ * @param gctx Group id.
+ * @param part Partition id.
+ * @param dumpFile Dump file path.
+ */
+ public PartitionDumpContext(CacheGroupContext gctx, int part, File
dumpFile) throws IOException {
+ assert gctx != null;
+
+ this.part = part;
+ grp = gctx.groupId();
+ topVer = gctx.topology().lastTopologyChangeVersion();
Review Comment:
This topology can be calculated after PME finished and topology change.
Let's persist topology version for CreateDumpFutureTask.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]