anton-vinogradov commented on a change in pull request #8909:
URL: https://github.com/apache/ignite/pull/8909#discussion_r642359504



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * Change Data Capture(CDC) application.
+ * Application run independently of Ignite node process and provide ability 
for the {@link ChangeDataCaptureConsumer}
+ * to consume events({@link ChangeDataCaptureEvent}) from WAL segments.
+ * User should responsible {@link ChangeDataCaptureConsumer} implementation 
with custom consumption logic.
+ *
+ * Ignite node should be explicitly configured for using {@link 
ChangeDataCapture}.
+ * <ol>
+ *     <li>Set {@link 
DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)} to true.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setChangeDataCaptureWalPath(String)} to path to the 
directory
+ *     to store WAL segments for CDC.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout 
for
+ *     force WAL rollover, so new events will be available for consumptions 
with the predicted time.</li>
+ * </ol>
+ *
+ * When {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} is true 
then Ignite node on each WAL segment
+ * rollover creates hard link to archive WAL segment in
+ * {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} directory. 
{@link ChangeDataCapture} application takes
+ * segment file and consumes events from it.
+ * After successful consumption (see {@link 
ChangeDataCaptureConsumer#onEvents(Iterator)}) WAL segment will be deleted
+ * from directory.
+ *
+ * Several Ignite nodes can be started on the same host.
+ * If your deployment done with custom consistent id then you should specify 
it via
+ * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided 
{@link IgniteConfiguration}.
+ *
+ * Application works as follows:
+ * <ol>
+ *     <li>Search node work directory based on provided {@link 
IgniteConfiguration}.</li>
+ *     <li>Await for creation of CDC directory if it not exists.</li>
+ *     <li>Acquire file lock to ensure exclusive consumption.</li>
+ *     <li>Loads state of consumption if it exists.</li>
+ *     <li>Infinetely wait for new available segment and process it.</li>
+ * </ol>
+ *
+ * @see DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)
+ * @see DataStorageConfiguration#setChangeDataCaptureWalPath(String)
+ * @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
+ * @see ChangeDataCaptureCommandLineStartup
+ * @see ChangeDataCaptureConsumer
+ * @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
+ */
+public class ChangeDataCapture implements Runnable {
+    /** */
+    public static final String ERR_MSG = "Persistence disabled. Capture Data 
Change can't run!";
+
+    /** State dir. */
+    public static final String STATE_DIR = "state";
+
+    /** Ignite configuration. */
+    private final IgniteConfiguration igniteCfg;
+
+    /** Spring resource context. */
+    private final GridSpringResourceContext ctx;
+
+    /** Change Data Capture configuration. */
+    private final ChangeDataCaptureConfiguration cdcCfg;
+
+    /** WAL iterator factory. */
+    private final IgniteWalIteratorFactory factory;
+
+    /** Events consumer. */
+    private final WALRecordsConsumer<?, ?> consumer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Change Data Capture directory. */
+    private Path cdcDir;
+
+    /** Binary meta directory. */
+    private File binaryMeta;
+
+    /** Marshaller directory. */
+    private File marshaller;
+
+    /** Change Data Capture state. */
+    private ChangeDataCaptureConsumerState state;
+
+    /** Save state to start from. */
+    private WALPointer initState;
+
+    /** Stopped flag. */
+    private volatile boolean stopped;
+
+    /** Already processed segments. */
+    private final List<Path> processed = new ArrayList<>();
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param ctx Spring resource context.
+     * @param cdcCfg Change Data Capture configuration.
+     */
+    public ChangeDataCapture(
+        IgniteConfiguration igniteCfg,
+        GridSpringResourceContext ctx,
+        ChangeDataCaptureConfiguration cdcCfg) {
+        this.igniteCfg = new IgniteConfiguration(igniteCfg);
+        this.ctx = ctx;
+        this.cdcCfg = cdcCfg;
+
+        initWorkDir(this.igniteCfg);
+
+        log = logger(this.igniteCfg);
+
+        consumer = new WALRecordsConsumer<>(cdcCfg.getConsumer(), log);
+
+        factory = new IgniteWalIteratorFactory(log);
+    }
+
+    /** Runs Change Data Capture. */
+    @Override public void run() {
+        synchronized (this) {
+            if (stopped)
+                return;
+        }
+
+        try {
+            runX();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Runs Change Data Capture application with possible exception. */
+    public void runX() throws Exception {
+        if (!CU.isPersistenceEnabled(igniteCfg)) {
+            log.error(ERR_MSG);
+
+            throw new IllegalArgumentException(ERR_MSG);
+        }
+
+        PdsFolderSettings<ChangeDataCaptureFileLockHolder> settings =
+            new PdsFolderResolver<>(igniteCfg, log, null, 
this::tryLock).resolve();
+
+        if (settings == null)
+            throw new IgniteException("Can't find PDS folder!");
+
+        ChangeDataCaptureFileLockHolder lock = 
settings.getLockedFileLockHolder();
+
+        if (lock == null) {
+            File consIdDir = new File(settings.persistentStoreRootPath(), 
settings.folderName());
+
+            lock = tryLock(consIdDir);
+
+            if (lock == null)
+                throw new IgniteException("Can't lock Change Data Capture dir 
" + consIdDir.getAbsolutePath());

Review comment:
       data should be presented as "It happened [it=it_is, 
happened=never_before_and_now_again]"
   Please refactor every exception and logging this way

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * Change Data Capture(CDC) application.
+ * Application run independently of Ignite node process and provide ability 
for the {@link ChangeDataCaptureConsumer}
+ * to consume events({@link ChangeDataCaptureEvent}) from WAL segments.
+ * User should responsible {@link ChangeDataCaptureConsumer} implementation 
with custom consumption logic.
+ *
+ * Ignite node should be explicitly configured for using {@link 
ChangeDataCapture}.
+ * <ol>
+ *     <li>Set {@link 
DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)} to true.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setChangeDataCaptureWalPath(String)} to path to the 
directory
+ *     to store WAL segments for CDC.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout 
for
+ *     force WAL rollover, so new events will be available for consumptions 
with the predicted time.</li>
+ * </ol>
+ *
+ * When {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} is true 
then Ignite node on each WAL segment
+ * rollover creates hard link to archive WAL segment in
+ * {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} directory. 
{@link ChangeDataCapture} application takes
+ * segment file and consumes events from it.
+ * After successful consumption (see {@link 
ChangeDataCaptureConsumer#onEvents(Iterator)}) WAL segment will be deleted
+ * from directory.
+ *
+ * Several Ignite nodes can be started on the same host.
+ * If your deployment done with custom consistent id then you should specify 
it via
+ * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided 
{@link IgniteConfiguration}.
+ *
+ * Application works as follows:
+ * <ol>
+ *     <li>Search node work directory based on provided {@link 
IgniteConfiguration}.</li>
+ *     <li>Await for creation of CDC directory if it not exists.</li>
+ *     <li>Acquire file lock to ensure exclusive consumption.</li>
+ *     <li>Loads state of consumption if it exists.</li>
+ *     <li>Infinetely wait for new available segment and process it.</li>
+ * </ol>
+ *
+ * @see DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)
+ * @see DataStorageConfiguration#setChangeDataCaptureWalPath(String)
+ * @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
+ * @see ChangeDataCaptureCommandLineStartup
+ * @see ChangeDataCaptureConsumer
+ * @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
+ */
+public class ChangeDataCapture implements Runnable {
+    /** */
+    public static final String ERR_MSG = "Persistence disabled. Capture Data 
Change can't run!";
+
+    /** State dir. */
+    public static final String STATE_DIR = "state";
+
+    /** Ignite configuration. */
+    private final IgniteConfiguration igniteCfg;
+
+    /** Spring resource context. */
+    private final GridSpringResourceContext ctx;
+
+    /** Change Data Capture configuration. */
+    private final ChangeDataCaptureConfiguration cdcCfg;
+
+    /** WAL iterator factory. */
+    private final IgniteWalIteratorFactory factory;
+
+    /** Events consumer. */
+    private final WALRecordsConsumer<?, ?> consumer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Change Data Capture directory. */
+    private Path cdcDir;
+
+    /** Binary meta directory. */
+    private File binaryMeta;
+
+    /** Marshaller directory. */
+    private File marshaller;
+
+    /** Change Data Capture state. */
+    private ChangeDataCaptureConsumerState state;
+
+    /** Save state to start from. */
+    private WALPointer initState;
+
+    /** Stopped flag. */
+    private volatile boolean stopped;
+
+    /** Already processed segments. */
+    private final List<Path> processed = new ArrayList<>();
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param ctx Spring resource context.
+     * @param cdcCfg Change Data Capture configuration.
+     */
+    public ChangeDataCapture(
+        IgniteConfiguration igniteCfg,
+        GridSpringResourceContext ctx,
+        ChangeDataCaptureConfiguration cdcCfg) {
+        this.igniteCfg = new IgniteConfiguration(igniteCfg);
+        this.ctx = ctx;
+        this.cdcCfg = cdcCfg;
+
+        initWorkDir(this.igniteCfg);
+
+        log = logger(this.igniteCfg);
+
+        consumer = new WALRecordsConsumer<>(cdcCfg.getConsumer(), log);
+
+        factory = new IgniteWalIteratorFactory(log);
+    }
+
+    /** Runs Change Data Capture. */
+    @Override public void run() {
+        synchronized (this) {
+            if (stopped)
+                return;
+        }
+
+        try {
+            runX();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Runs Change Data Capture application with possible exception. */
+    public void runX() throws Exception {
+        if (!CU.isPersistenceEnabled(igniteCfg)) {
+            log.error(ERR_MSG);
+
+            throw new IllegalArgumentException(ERR_MSG);
+        }
+
+        PdsFolderSettings<ChangeDataCaptureFileLockHolder> settings =
+            new PdsFolderResolver<>(igniteCfg, log, null, 
this::tryLock).resolve();
+
+        if (settings == null)
+            throw new IgniteException("Can't find PDS folder!");

Review comment:
       Please mention the path

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * Change Data Capture(CDC) application.
+ * Application run independently of Ignite node process and provide ability 
for the {@link ChangeDataCaptureConsumer}
+ * to consume events({@link ChangeDataCaptureEvent}) from WAL segments.
+ * User should responsible {@link ChangeDataCaptureConsumer} implementation 
with custom consumption logic.
+ *
+ * Ignite node should be explicitly configured for using {@link 
ChangeDataCapture}.
+ * <ol>
+ *     <li>Set {@link 
DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)} to true.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setChangeDataCaptureWalPath(String)} to path to the 
directory
+ *     to store WAL segments for CDC.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout 
for
+ *     force WAL rollover, so new events will be available for consumptions 
with the predicted time.</li>
+ * </ol>
+ *
+ * When {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} is true 
then Ignite node on each WAL segment
+ * rollover creates hard link to archive WAL segment in
+ * {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} directory. 
{@link ChangeDataCapture} application takes
+ * segment file and consumes events from it.
+ * After successful consumption (see {@link 
ChangeDataCaptureConsumer#onEvents(Iterator)}) WAL segment will be deleted
+ * from directory.
+ *
+ * Several Ignite nodes can be started on the same host.
+ * If your deployment done with custom consistent id then you should specify 
it via
+ * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided 
{@link IgniteConfiguration}.
+ *
+ * Application works as follows:
+ * <ol>
+ *     <li>Search node work directory based on provided {@link 
IgniteConfiguration}.</li>
+ *     <li>Await for creation of CDC directory if it not exists.</li>
+ *     <li>Acquire file lock to ensure exclusive consumption.</li>
+ *     <li>Loads state of consumption if it exists.</li>
+ *     <li>Infinetely wait for new available segment and process it.</li>
+ * </ol>
+ *
+ * @see DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)
+ * @see DataStorageConfiguration#setChangeDataCaptureWalPath(String)
+ * @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
+ * @see ChangeDataCaptureCommandLineStartup
+ * @see ChangeDataCaptureConsumer
+ * @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
+ */
+public class ChangeDataCapture implements Runnable {
+    /** */
+    public static final String ERR_MSG = "Persistence disabled. Capture Data 
Change can't run!";
+
+    /** State dir. */
+    public static final String STATE_DIR = "state";
+
+    /** Ignite configuration. */
+    private final IgniteConfiguration igniteCfg;
+
+    /** Spring resource context. */
+    private final GridSpringResourceContext ctx;
+
+    /** Change Data Capture configuration. */
+    private final ChangeDataCaptureConfiguration cdcCfg;
+
+    /** WAL iterator factory. */
+    private final IgniteWalIteratorFactory factory;
+
+    /** Events consumer. */
+    private final WALRecordsConsumer<?, ?> consumer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Change Data Capture directory. */
+    private Path cdcDir;
+
+    /** Binary meta directory. */
+    private File binaryMeta;
+
+    /** Marshaller directory. */
+    private File marshaller;
+
+    /** Change Data Capture state. */
+    private ChangeDataCaptureConsumerState state;
+
+    /** Save state to start from. */
+    private WALPointer initState;
+
+    /** Stopped flag. */
+    private volatile boolean stopped;
+
+    /** Already processed segments. */
+    private final List<Path> processed = new ArrayList<>();
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param ctx Spring resource context.
+     * @param cdcCfg Change Data Capture configuration.
+     */
+    public ChangeDataCapture(
+        IgniteConfiguration igniteCfg,
+        GridSpringResourceContext ctx,
+        ChangeDataCaptureConfiguration cdcCfg) {
+        this.igniteCfg = new IgniteConfiguration(igniteCfg);
+        this.ctx = ctx;
+        this.cdcCfg = cdcCfg;
+
+        initWorkDir(this.igniteCfg);
+
+        log = logger(this.igniteCfg);
+
+        consumer = new WALRecordsConsumer<>(cdcCfg.getConsumer(), log);
+
+        factory = new IgniteWalIteratorFactory(log);
+    }
+
+    /** Runs Change Data Capture. */
+    @Override public void run() {
+        synchronized (this) {
+            if (stopped)
+                return;
+        }
+
+        try {
+            runX();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Runs Change Data Capture application with possible exception. */
+    public void runX() throws Exception {
+        if (!CU.isPersistenceEnabled(igniteCfg)) {
+            log.error(ERR_MSG);
+
+            throw new IllegalArgumentException(ERR_MSG);
+        }
+
+        PdsFolderSettings<ChangeDataCaptureFileLockHolder> settings =
+            new PdsFolderResolver<>(igniteCfg, log, null, 
this::tryLock).resolve();
+
+        if (settings == null)
+            throw new IgniteException("Can't find PDS folder!");
+
+        ChangeDataCaptureFileLockHolder lock = 
settings.getLockedFileLockHolder();
+
+        if (lock == null) {
+            File consIdDir = new File(settings.persistentStoreRootPath(), 
settings.folderName());
+
+            lock = tryLock(consIdDir);
+
+            if (lock == null)
+                throw new IgniteException("Can't lock Change Data Capture dir 
" + consIdDir.getAbsolutePath());
+        }
+
+        try {
+            String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
+
+            Files.createDirectories(cdcDir.resolve(STATE_DIR));
+
+            binaryMeta = 
CacheObjectBinaryProcessorImpl.binaryWorkDir(igniteCfg.getWorkDirectory(), 
consIdDir);
+
+            marshaller = 
MarshallerContextImpl.mappingFileStoreWorkDir(igniteCfg.getWorkDirectory());
+
+            injectResources(consumer.consumer());
+
+            ackAsciiLogo();
+
+            state = new 
ChangeDataCaptureConsumerState(cdcDir.resolve(STATE_DIR));
+
+            initState = state.load();
+
+            if (initState != null && log.isInfoEnabled())
+                log.info("Initial state loaded [state=" + initState + ']');
+
+            consumer.start();
+
+            try {
+                consumeWalSegmentsUntilStopped();
+            }
+            finally {
+                consumer.stop();
+
+                if (log.isInfoEnabled())
+                    log.info("Ignite Change Data Capture Application stoped.");
+            }
+        }
+        finally {
+            U.closeQuiet(lock);
+        }
+    }
+
+    /** Waits and consumes new WAL segments until stoped. */
+    public void consumeWalSegmentsUntilStopped() {
+        try {
+            Set<Path> seen = new HashSet<>();
+
+            AtomicLong lastSgmnt = new AtomicLong(-1);
+
+            while (!stopped) {
+                try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
+                    Set<Path> exists = new HashSet<>();
+
+                    cdcFiles
+                        .peek(exists::add) // Store files that exists in cdc 
dir.
+                        // Need unseen WAL segments only.
+                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
+                        .peek(seen::add) // Adds to seen.
+                        .sorted(Comparator.comparingLong(this::segmentIndex)) 
// Sort by segment index.
+                        .peek(p -> {
+                            long nextSgmnt = segmentIndex(p);
+
+                            assert lastSgmnt.get() == -1 || nextSgmnt - 
lastSgmnt.get() == 1;
+
+                            lastSgmnt.set(nextSgmnt);
+                        })
+                        .forEach(this::consumeSegment); // Consuming segments.
+
+                    seen.removeIf(p -> !exists.contains(p)); // Clean up seen 
set.
+                }
+
+                if (!stopped)
+                    U.sleep(cdcCfg.getCheckFrequency());
+            }
+        }
+        catch (IOException | IgniteInterruptedCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Reads all available records from segment. */
+    private void consumeSegment(Path segment) {
+        if (log.isInfoEnabled())
+            log.info("Processing WAL segment[segment=" + segment + ']');
+
+        IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .log(log)
+                .binaryMetadataFileStoreDir(binaryMeta)
+                .marshallerMappingFileStoreDir(marshaller)
+                .keepBinary(cdcCfg.isKeepBinary())
+                .filesOrDirs(segment.toFile())
+                .addFilter((type, ptr) -> type == DATA_RECORD_V2);
+
+        if (initState != null) {
+            long segmentIdx = segmentIndex(segment);
+
+            if (segmentIdx > initState.index()) {
+                log.error("Found segment greater then saved state. Some events 
are missed. Exiting!" +
+                    "[state=" + initState + ",segment=" + segmentIdx + ']');
+
+                throw new IgniteException("Some data missed.");
+            }
+
+            if (segmentIdx < initState.index()) {
+                if (log.isInfoEnabled()) {
+                    log.info("Already processed segment found. Skipping and 
deleting the file [segment=" +
+                        segmentIdx + ",state=" + initState.index() + ']');
+                }
+
+                // WAL segment is a hard link to a segment file in the special 
Change Data Capture folder.
+                // So, we can safely delete it after processing.
+                try {
+                    Files.delete(segment);
+
+                    return;
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            builder.from(initState);
+
+            initState = null;
+        }
+
+        try (WALIterator it = factory.iterator(builder)) {
+            while (it.hasNext()) {
+                boolean commit = consumer.onRecords(F.iterator(it.iterator(), 
IgniteBiTuple::get2, true));
+
+                if (commit) {
+                    assert it.lastRead().isPresent();
+
+                    state.save(it.lastRead().get());
+
+                    // Can delete after new file state save.
+                    if (!processed.isEmpty()) {
+                        // WAL segment is a hard link to a segment file in a 
specifal Change Data Capture folder.
+                        // So we can safely delete it after success processing.
+                        for (Path prevSegment : processed)
+                            Files.delete(prevSegment);
+
+                        processed.clear();
+                    }
+                }
+            }
+        } catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+
+        processed.add(segment);
+    }
+
+    /**
+     * Try locks Change Data Capture directory.
+     *
+     * @param dbStoreDirWithSubdirectory Root PDS directory.
+     * @return Lock or null if lock failed.
+     */
+    private ChangeDataCaptureFileLockHolder tryLock(File 
dbStoreDirWithSubdirectory) {
+        if (!dbStoreDirWithSubdirectory.exists()) {
+            log.warning(dbStoreDirWithSubdirectory + " not exists.");
+
+            return null;
+        }
+
+        File cdcRoot = new 
File(igniteCfg.getDataStorageConfiguration().getChangeDataCaptureWalPath());
+
+        if (!cdcRoot.isAbsolute()) {
+            cdcRoot = new File(
+                igniteCfg.getWorkDirectory(),
+                
igniteCfg.getDataStorageConfiguration().getChangeDataCaptureWalPath()
+            );
+        }
+
+        if (!cdcRoot.exists()) {
+            log.warning(cdcRoot + " not exists. Should be created by Ignite 
Node. " +
+                "Is Change Data Capture enabled in IgniteConfiguration?");
+
+            return null;
+        }
+
+        Path cdcDir = Paths.get(cdcRoot.getAbsolutePath(), 
dbStoreDirWithSubdirectory.getName());
+
+        if (!Files.exists(cdcDir)) {
+            log.warning(cdcRoot + " not exists. Should be create by Ignite 
Node. " +

Review comment:
       cdcDir?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/startup/cmdline/ChangeDataCaptureCommandLineStartup.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.startup.cmdline;
+
+import java.net.URL;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureLoader;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.X;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PROG_NAME;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
+
+/**
+ * This class defines command-line Ignite Capture Data Change startup. This 
startup can be used to start Ignite
+ * Capture Data Change application outside of any hosting environment from 
command line.
+ * This startup is a Java application with {@link #main(String[])} method that 
accepts command line arguments.
+ * It accepts just one parameter which is Spring XML configuration file path.
+ * You can run this class from command line without parameters to get help 
message.
+ * <p>
+ * Note that scripts {@code ${IGNITE_HOME}/bin/cdc.{sh|bat}} shipped with 
Ignite use
+ * this startup and you can use them as an example.
+ * <p>
+ *
+ * @see ChangeDataCapture
+ */
+public class ChangeDataCaptureCommandLineStartup {
+    /** Quite log flag. */
+    private static final boolean QUITE = 
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_QUIET);
+
+    /**
+     * Main entry point.
+     *
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args) {
+        if (!QUITE) {
+            X.println("Ignite CDC Command Line Startup, ver. " + ACK_VER_STR);
+            X.println(COPYRIGHT);
+            X.println();
+        }
+
+        if (args.length > 1)
+            exit("Too many arguments.", true, -1);
+
+        if (args.length > 0 && isHelp(args[0]))
+            exit(null, true, 0);
+
+        if (args.length > 0 && args[0].isEmpty())
+            exit("Empty argument.", true, 1);
+
+        if (args.length > 0 && args[0].charAt(0) == '-')
+            exit("Invalid arguments: " + args[0], true, -1);
+
+        try {
+            ChangeDataCapture cdc = 
ChangeDataCaptureLoader.loadChangeDataCapture(args[0]);
+
+            if (!IgniteSystemProperties.getBoolean(IGNITE_NO_SHUTDOWN_HOOK, 
false)) {

Review comment:
       Not sure we should change this, but we may want to have different 
SIGTERM strategies for ignite and cdcd.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/startup/cmdline/ChangeDataCaptureCommandLineStartup.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.startup.cmdline;
+
+import java.net.URL;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureLoader;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.X;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PROG_NAME;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
+
+/**
+ * This class defines command-line Ignite Capture Data Change startup. This 
startup can be used to start Ignite
+ * Capture Data Change application outside of any hosting environment from 
command line.
+ * This startup is a Java application with {@link #main(String[])} method that 
accepts command line arguments.
+ * It accepts just one parameter which is Spring XML configuration file path.
+ * You can run this class from command line without parameters to get help 
message.
+ * <p>
+ * Note that scripts {@code ${IGNITE_HOME}/bin/cdc.{sh|bat}} shipped with 
Ignite use
+ * this startup and you can use them as an example.
+ * <p>
+ *
+ * @see ChangeDataCapture
+ */
+public class ChangeDataCaptureCommandLineStartup {
+    /** Quite log flag. */
+    private static final boolean QUITE = 
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_QUIET);
+
+    /**
+     * Main entry point.
+     *
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args) {
+        if (!QUITE) {
+            X.println("Ignite CDC Command Line Startup, ver. " + ACK_VER_STR);

Review comment:
       "Change Data Capture" because will be read by the user.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup;
+
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * CDC(Change Data Capture) application.
+ * Application run independently of Ignite node process and provide ability 
for the {@link ChangeDataCaptureConsumer}
+ * to consume events({@link ChangeDataCaptureEvent}) from WAL segments.
+ * User should responsible {@link ChangeDataCaptureConsumer} implementation 
with custom consumption logic.
+ *
+ * Ignite node should be explicitly configured for using {@link 
ChangeDataCapture}.
+ * <ol>
+ *     <li>Set {@link DataStorageConfiguration#setCdcEnabled(boolean)} to 
true.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setCdcWalPath(String)} to path to the directory to 
store
+ *     WAL segments for CDC.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout 
for
+ *     force WAL rollover, so new events will be available for consumptions 
with the predicted time.</li>
+ * </ol>
+ *
+ * When {@link DataStorageConfiguration#getCdcWalPath()} is true then Ignite 
node on each WAL segment rollover creates
+ * hard link to archive WAL segment in {@link 
DataStorageConfiguration#getCdcWalPath()} directory.
+ * {@link ChangeDataCapture} application takes segment file and consumes 
events from it.
+ * After successful consumption (see {@link 
ChangeDataCaptureConsumer#onEvents(Iterator)})
+ * WAL segment will be deleted from directory.
+ *
+ * Several Ignite nodes can be started on the same host.
+ * If your deployment done with custom consistent id then you should specify 
it via
+ * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided 
{@link IgniteConfiguration}.
+ *
+ * Application works as follows:
+ * <ol>
+ *     <li>Search node work directory based on provided {@link 
IgniteConfiguration}.</li>
+ *     <li>Await for creation of CDC directory if it not exists.</li>
+ *     <li>Acquire file lock to ensure exclusive consumption.</li>
+ *     <li>Loads state of consumption if it exists.</li>
+ *     <li>Infinetely wait for new available segment and process it.</li>
+ * </ol>
+ *
+ * @see DataStorageConfiguration#setCdcEnabled(boolean)
+ * @see DataStorageConfiguration#setCdcWalPath(String)
+ * @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
+ * @see ChangeDataCaptureCommandLineStartup
+ * @see ChangeDataCaptureConsumer
+ * @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
+ */
+public class ChangeDataCapture implements Runnable {
+    /** */
+    public static final String ERR_MSG = "Persistence disabled. Capture Data 
Change can't run!";
+
+    /** State dir. */
+    public static final String STATE_DIR = "state";
+
+    /** Ignite configuration. */
+    private final IgniteConfiguration cfg;
+
+    /** Spring resource context. */
+    private final GridSpringResourceContext ctx;
+
+    /** CDC configuration. */
+    private final ChangeDataCaptureConfiguration cdcCfg;
+
+    /** WAL iterator factory. */
+    private final IgniteWalIteratorFactory factory;
+
+    /** Events consumer. */
+    private final WALRecordsConsumer<?, ?> consumer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** CDC directory. */
+    private Path cdcDir;
+
+    /** Binary meta directory. */
+    private File binaryMeta;
+
+    /** Marshaller directory. */
+    private File marshaller;
+
+    /** CDC state. */
+    private ChangeDataCaptureConsumerState state;
+
+    /** Save state to start from. */
+    private WALPointer initState;
+
+    /** Stopped flag. */
+    private volatile boolean stopped;
+
+    /** Previous segments. */
+    private final List<Path> prevSegments = new ArrayList<>();
+
+    /**
+     * @param cfg Ignite configuration.
+     * @param ctx Spring resource context.
+     * @param cdcCfg CDC configuration.
+     */
+    public ChangeDataCapture(
+        IgniteConfiguration cfg,
+        GridSpringResourceContext ctx,
+        ChangeDataCaptureConfiguration cdcCfg) {
+        this.cfg = new IgniteConfiguration(cfg);
+        this.ctx = ctx;
+        this.cdcCfg = cdcCfg;
+
+        consumer = new WALRecordsConsumer<>(cdcCfg.getConsumer());
+
+        initWorkDir(this.cfg);
+
+        log = logger(this.cfg);
+
+        factory = new IgniteWalIteratorFactory(log);
+    }
+
+    /** Runs CDC. */
+    @Override public void run() {
+        synchronized (this) {
+            if (stopped)
+                return;
+        }
+
+        try {
+            runX();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Runs CDC application with possible exception. */
+    public void runX() throws Exception {
+        if (!CU.isPersistenceEnabled(cfg)) {
+            log.error(ERR_MSG);
+
+            throw new IllegalArgumentException(ERR_MSG);
+        }
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting Ignite CDC Application.");
+            log.info("Consumer     -\t" + consumer.toString());
+            log.info("ConsistentId -\t" + cfg.getConsistentId());
+        }
+
+        PdsFolderSettings<ChangeDataCaptureFileLockHolder> settings =
+            new PdsFolderResolver<>(cfg, log, null, this::tryLock).resolve();
+
+        if (settings == null)
+            throw new RuntimeException("Can't find PDS folder!");
+
+        ChangeDataCaptureFileLockHolder lock = 
settings.getLockedFileLockHolder();
+
+        if (lock == null) {
+            File consIdDir = new File(settings.persistentStoreRootPath(), 
settings.folderName());
+
+            lock = tryLock(consIdDir);
+
+            if (lock == null)
+                throw new RuntimeException("Can't lock CDC dir " + 
settings.consistentId());
+        }
+
+        try {
+            init();
+
+            if (log.isInfoEnabled()) {
+                log.info("CDC dir     -\t" + cdcDir);
+                log.info("Binary meta -\t" + binaryMeta);
+                log.info("Marshaller  -\t" + marshaller);
+                log.info("--------------------------------");
+            }
+
+            state = new 
ChangeDataCaptureConsumerState(cdcDir.resolve(STATE_DIR));
+
+            initState = state.load();
+
+            if (initState != null && log.isInfoEnabled())
+                log.info("Loaded initial state[state=" + initState + ']');
+
+            consumer.start(log);
+
+            try {
+                consumeWalSegmentsUntilStopped();
+            }
+            finally {
+                consumer.stop();
+
+                if (log.isInfoEnabled())
+                    log.info("Ignite CDC Application stoped.");
+            }
+        }
+        finally {
+            U.closeQuiet(lock);
+        }
+    }
+
+    /** Searches required directories. */
+    private void init() throws IOException, IgniteCheckedException {
+        String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
+
+        Files.createDirectories(cdcDir.resolve(STATE_DIR));
+
+        binaryMeta = 
CacheObjectBinaryProcessorImpl.binaryWorkDir(cfg.getWorkDirectory(), consIdDir);
+
+        marshaller = 
MarshallerContextImpl.mappingFileStoreWorkDir(cfg.getWorkDirectory());
+
+        if (log.isDebugEnabled()) {
+            log.debug("Using BinaryMeta directory[dir=" + binaryMeta + ']');
+            log.debug("Using Marshaller directory[dir=" + marshaller + ']');
+        }
+
+        injectResources(consumer.getCdcConsumer());
+    }
+
+    /** Waits and consumes new WAL segments until stoped. */
+    public void consumeWalSegmentsUntilStopped() {
+        try {
+            Set<Path> seen = new HashSet<>();
+
+            AtomicLong lastSgmnt = new AtomicLong(-1);
+
+            while (!stopped) {
+                try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
+                    Set<Path> exists = new HashSet<>();
+
+                    cdcFiles
+                        .peek(exists::add) // Store files that exists in cdc 
dir.
+                        // Need unseen WAL segments only.
+                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
+                        .peek(seen::add) // Adds to seen.
+                        .sorted(Comparator.comparingLong(this::segmentIndex)) 
// Sort by segment index.
+                        .peek(p -> {
+                            long nextSgmnt = segmentIndex(p);
+
+                            assert lastSgmnt.get() == -1 || nextSgmnt - 
lastSgmnt.get() == 1;
+
+                            lastSgmnt.set(nextSgmnt);
+                        })
+                        .forEach(this::consumeSegment); // Consuming segments.
+
+                    seen.removeIf(p -> !exists.contains(p)); // Clean up seen 
set.
+                }
+
+                if (!stopped)
+                    U.sleep(cdcCfg.getCheckFrequency());
+            }
+        }
+        catch (IOException | IgniteInterruptedCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Reads all available records from segment. */
+    private void consumeSegment(Path segment) {
+        if (log.isInfoEnabled())
+            log.info("Processing WAL segment[segment=" + segment + ']');
+
+        IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .log(log)
+                .binaryMetadataFileStoreDir(binaryMeta)
+                .marshallerMappingFileStoreDir(marshaller)
+                .keepBinary(cdcCfg.isKeepBinary())
+                .filesOrDirs(segment.toFile())
+                .addFilter((type, ptr) -> type == DATA_RECORD_V2);
+
+        if (initState != null) {
+            long segmentIdx = segmentIndex(segment);
+
+            if (segmentIdx > initState.index()) {
+                log.error("Found segment greater then saved state. Some events 
are missed. Exiting!" +
+                    "[state=" + initState + ",segment=" + segmentIdx + ']');
+
+                throw new IgniteException("Some data missed.");
+            }
+
+            if (segmentIdx < initState.index()) {
+                if (log.isInfoEnabled()) {
+                    log.info("Deleting segment. Saved state has greater 
index.[segment=" +
+                        segmentIdx + ",state=" + initState.index() + ']');
+                }
+
+                // WAL segment is a hard link to a segment file in the special 
CDC folder.
+                // So, we can safely delete it after processing.
+                try {
+                    Files.delete(segment);
+
+                    return;
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            builder.from(initState);
+
+            initState = null;
+        }
+
+        try (WALIterator it = factory.iterator(builder)) {
+            while (it.hasNext()) {
+                boolean commit = consumer.onRecords(F.iterator(it.iterator(), 
IgniteBiTuple::get2, true));
+
+                if (commit) {
+                    assert it.lastRead().isPresent();
+
+                    state.save(it.lastRead().get());
+
+                    // Can delete after new file state save.
+                    if (!prevSegments.isEmpty()) {
+                        // WAL segment is a hard link to a segment file in a 
specifal CDC folder.
+                        // So we can safely delete it after success processing.
+                        for (Path prevSegment : prevSegments)
+                            Files.delete(prevSegment);
+
+                        prevSegments.clear();
+                    }
+                }
+            }
+        } catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+
+        prevSegments.add(segment);
+    }
+
+    /**
+     * Try locks CDC directory.
+     *
+     * @param dbStoreDirWithSubdirectory Root PDS directory.
+     * @return Lock or null if lock failed.
+     */
+    private ChangeDataCaptureFileLockHolder tryLock(File 
dbStoreDirWithSubdirectory) {
+        if (!dbStoreDirWithSubdirectory.exists()) {
+            log.warning(dbStoreDirWithSubdirectory + " not exists.");

Review comment:
       See no fixes, reopening.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/startup/cmdline/ChangeDataCaptureCommandLineStartup.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.startup.cmdline;
+
+import java.net.URL;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureLoader;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.X;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_SHUTDOWN_HOOK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PROG_NAME;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
+
+/**
+ * This class defines command-line Ignite Capture Data Change startup. This 
startup can be used to start Ignite
+ * Capture Data Change application outside of any hosting environment from 
command line.
+ * This startup is a Java application with {@link #main(String[])} method that 
accepts command line arguments.
+ * It accepts just one parameter which is Spring XML configuration file path.
+ * You can run this class from command line without parameters to get help 
message.
+ * <p>
+ * Note that scripts {@code ${IGNITE_HOME}/bin/cdc.{sh|bat}} shipped with 
Ignite use
+ * this startup and you can use them as an example.
+ * <p>
+ *
+ * @see ChangeDataCapture
+ */
+public class ChangeDataCaptureCommandLineStartup {
+    /** Quite log flag. */
+    private static final boolean QUITE = 
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_QUIET);
+
+    /**
+     * Main entry point.
+     *
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args) {
+        if (!QUITE) {
+            X.println("Ignite CDC Command Line Startup, ver. " + ACK_VER_STR);
+            X.println(COPYRIGHT);
+            X.println();
+        }
+
+        if (args.length > 1)
+            exit("Too many arguments.", true, -1);
+
+        if (args.length > 0 && isHelp(args[0]))
+            exit(null, true, 0);
+
+        if (args.length > 0 && args[0].isEmpty())
+            exit("Empty argument.", true, 1);
+
+        if (args.length > 0 && args[0].charAt(0) == '-')
+            exit("Invalid arguments: " + args[0], true, -1);
+
+        try {
+            ChangeDataCapture cdc = 
ChangeDataCaptureLoader.loadChangeDataCapture(args[0]);
+
+            if (!IgniteSystemProperties.getBoolean(IGNITE_NO_SHUTDOWN_HOOK, 
false)) {
+                Runtime.getRuntime().addShutdownHook(new 
Thread("cdc-shutdown-hook") {
+                    @Override public void run() {
+                        cdc.stop();
+                    }
+                });
+            }
+
+            Thread appThread = new Thread(cdc);
+
+            appThread.start();
+
+            appThread.join();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            String note = "";
+
+            if (X.hasCause(e, ClassNotFoundException.class))
+                note = "\nNote! You may use 'USER_LIBS' environment variable 
to specify your classpath.";
+
+            exit("Failed to run CDC: " + e.getMessage() + note, false, -1);
+        }
+    }
+
+    /**
+     * @param cfgUrl String configuration URL.
+     * @param spring Ignite spring helper.
+     * @return CDC consumer defined in spring configuration.
+     * @throws IgniteCheckedException
+     */
+    private static ChangeDataCaptureConfiguration consumerConfig(
+        URL cfgUrl,
+        IgniteSpringHelper spring
+    ) throws IgniteCheckedException {
+        Map<Class<?>, Object> cdcCfgs = spring.loadBeans(cfgUrl, 
ChangeDataCaptureConfiguration.class);
+
+        if (cdcCfgs == null || cdcCfgs.size() != 1)
+            exit("Exact 1 CaptureDataChangeConfiguration configuration should 
be defined", false, 1);
+
+        return 
(ChangeDataCaptureConfiguration)cdcCfgs.values().iterator().next();
+    }
+
+    /**
+     * Exists with optional error message, usage show and exit code.
+     *
+     * @param errMsg Optional error message.
+     * @param showUsage Whether or not to show usage information.
+     * @param exitCode Exit code.
+     */
+    private static void exit(@Nullable String errMsg, boolean showUsage, int 
exitCode) {
+        if (errMsg != null)
+            X.error(errMsg);
+
+        String runner = System.getProperty(IGNITE_PROG_NAME, 
"ignite-cdc.{sh|bat}");

Review comment:
       IGNITE_PROG_NAME has no bearing on cdc {sh|bat}

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/WALRecordsConsumer.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.util.EnumSet;
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgnitePredicate;
+
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Transform {@link DataEntry} to {@link ChangeDataCaptureEvent} and sends it 
to {@link ChangeDataCaptureConsumer}.
+ *
+ * @see ChangeDataCapture
+ * @see ChangeDataCaptureConsumer
+ */
+public class WALRecordsConsumer<K, V> {
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Data change events consumer. */
+    private final ChangeDataCaptureConsumer consumer;
+
+    /** Operations types we interested in. */
+    private static final EnumSet<GridCacheOperation> OPERATIONS_TYPES = 
EnumSet.of(CREATE, UPDATE, DELETE, TRANSFORM);
+
+    /** Operations filter. */
+    private static final IgnitePredicate<? super DataEntry> OPERATIONS_FILTER 
= e -> {
+        if (!(e instanceof UnwrappedDataEntry))
+            throw new IllegalStateException("Unexpected data entry type[" + 
e.getClass().getName());

Review comment:
       incorrect output pattern

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureOrderTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.TestCDCConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.User;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.plugin.AbstractCachePluginProvider;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.KEYS_CNT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.addAndWaitForConsumption;
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.cdcConfig;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.configuration.WALMode.FSYNC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class ChangeDataCaptureOrderTest extends GridCommonAbstractTest {
+    /** */
+    public static final String FOR_OTHER_DR_ID = "for-other-dr-id";
+
+    /** */
+    public static final byte OTHER_DR_ID = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        int segmentSz = 10 * 1024 * 1024;
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setChangeDataCaptureEnabled(true)
+            .setWalMode(FSYNC)
+            .setMaxWalArchiveSize(10 * segmentSz)
+            .setWalSegmentSize(segmentSz)
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)));
+
+        cfg.setPluginProviders(new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "ConflictResolverProvider";
+            }
+
+            @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+                if 
(!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_DR_ID))
+                    return null;
+
+                return new AbstractCachePluginProvider() {
+                    @Override public @Nullable Object createComponent(Class 
cls) {
+                        if (cls != CacheConflictResolutionManager.class)
+                            return null;
+
+                        return new TestCacheConflictResolutionManager();
+                    }
+                };
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.beforeTest();
+    }
+
+    /** Simplest CDC test with usage of {@link 
IgniteInternalCache#putAllConflict(Map)}. */
+    @Test
+    public void testReadAllKeysWithOtherDc() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("ignite-conflict-resolver");
+
+        IgniteEx ign = startGrid(cfg);
+
+        byte drId = (byte)1;
+
+        ign.context().cache().context().versions().dataCenterId(drId);
+
+        ign.cluster().state(ACTIVE);
+
+        TestCDCConsumer cnsmr = new TestCDCConsumer();
+
+        ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, 
cdcConfig(cnsmr));
+
+        IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(FOR_OTHER_DR_ID);
+
+        cnsmr.drId = 1;
+        cnsmr.otherDrId = OTHER_DR_ID;
+
+        addAndWaitForConsumption(cnsmr, cdc, cache, null, 
this::addConflictData, 0, KEYS_CNT * 2, getTestTimeout());
+    }
+
+    /** */
+    @Test
+    public void testOrderIncrease() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+        IgniteEx ign = startGrid(cfg);
+
+        ign.cluster().state(ACTIVE);
+
+        AtomicLong updCntr = new AtomicLong(0);
+        int key = 42;
+
+        ChangeDataCaptureConsumer cnsmr = new ChangeDataCaptureConsumer() {
+            private long order = -1;
+
+            @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> 
evts) {
+                evts.forEachRemaining(evt -> {
+                    assertEquals(key, evt.key());
+
+                    assertTrue(evt.version().order() > order);
+
+                    order = evt.version().order();
+
+                    updCntr.incrementAndGet();
+                });
+
+                return true;
+            }
+
+            @Override public void start() {
+                // No-op.
+            }
+
+            @Override public void stop() {
+                // No-op.
+            }
+        };
+
+        ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, 
cdcConfig(cnsmr));
+
+        IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
+
+        IgniteInternalFuture<?> fut = runAsync(cdc);
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.put(key, new User("John Connor " + i, 42 + i, null));
+
+        assertTrue(waitForCondition(() -> updCntr.get() == KEYS_CNT, 
getTestTimeout()));
+
+        fut.cancel();
+    }
+
+    /** */
+    private void addConflictData(IgniteCache<Integer, User> cache, int from, 
int to) {
+        try {
+            IgniteEx ign = (IgniteEx)G.allGrids().get(0);
+
+            IgniteInternalCache intCache = ign.cachex(cache.getName());
+
+            Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
+
+            for (int i = from; i < to; i++) {
+                byte[] bytes = new byte[1024];
+
+                ThreadLocalRandom.current().nextBytes(bytes);
+
+                KeyCacheObject key = new KeyCacheObjectImpl(i, null, 
intCache.affinity().partition(i));
+                CacheObject val =
+                    new CacheObjectImpl(new User("John Connor " + i, 42 + i, 
bytes), null);
+
+                val.prepareMarshal(intCache.context().cacheObjectContext());
+
+                drMap.put(key, new GridCacheDrInfo(val, new 
GridCacheVersion(1, i, 1, OTHER_DR_ID)));
+            }
+
+            intCache.putAllConflict(drMap);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    public static class TestCacheConflictResolutionManager<K, V> extends 
GridCacheManagerAdapter<K, V>
+        implements CacheConflictResolutionManager<K, V> {
+        @Override public CacheVersionConflictResolver conflictResolver() {

Review comment:
       javadoc is absent

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureOrderTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.TestCDCConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.User;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.plugin.AbstractCachePluginProvider;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.KEYS_CNT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.addAndWaitForConsumption;
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.cdcConfig;

Review comment:
       references to another test that has no relations with the current

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * Change Data Capture(CDC) application.
+ * Application run independently of Ignite node process and provide ability 
for the {@link ChangeDataCaptureConsumer}
+ * to consume events({@link ChangeDataCaptureEvent}) from WAL segments.
+ * User should responsible {@link ChangeDataCaptureConsumer} implementation 
with custom consumption logic.
+ *
+ * Ignite node should be explicitly configured for using {@link 
ChangeDataCapture}.
+ * <ol>
+ *     <li>Set {@link 
DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)} to true.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setChangeDataCaptureWalPath(String)} to path to the 
directory
+ *     to store WAL segments for CDC.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout 
for
+ *     force WAL rollover, so new events will be available for consumptions 
with the predicted time.</li>
+ * </ol>
+ *
+ * When {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} is true 
then Ignite node on each WAL segment
+ * rollover creates hard link to archive WAL segment in
+ * {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} directory. 
{@link ChangeDataCapture} application takes
+ * segment file and consumes events from it.
+ * After successful consumption (see {@link 
ChangeDataCaptureConsumer#onEvents(Iterator)}) WAL segment will be deleted
+ * from directory.
+ *
+ * Several Ignite nodes can be started on the same host.
+ * If your deployment done with custom consistent id then you should specify 
it via
+ * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided 
{@link IgniteConfiguration}.
+ *
+ * Application works as follows:
+ * <ol>
+ *     <li>Search node work directory based on provided {@link 
IgniteConfiguration}.</li>
+ *     <li>Await for creation of CDC directory if it not exists.</li>
+ *     <li>Acquire file lock to ensure exclusive consumption.</li>
+ *     <li>Loads state of consumption if it exists.</li>
+ *     <li>Infinetely wait for new available segment and process it.</li>
+ * </ol>
+ *
+ * @see DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)
+ * @see DataStorageConfiguration#setChangeDataCaptureWalPath(String)
+ * @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
+ * @see ChangeDataCaptureCommandLineStartup
+ * @see ChangeDataCaptureConsumer
+ * @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
+ */
+public class ChangeDataCapture implements Runnable {
+    /** */
+    public static final String ERR_MSG = "Persistence disabled. Capture Data 
Change can't run!";
+
+    /** State dir. */
+    public static final String STATE_DIR = "state";
+
+    /** Ignite configuration. */
+    private final IgniteConfiguration igniteCfg;
+
+    /** Spring resource context. */
+    private final GridSpringResourceContext ctx;
+
+    /** Change Data Capture configuration. */
+    private final ChangeDataCaptureConfiguration cdcCfg;
+
+    /** WAL iterator factory. */
+    private final IgniteWalIteratorFactory factory;
+
+    /** Events consumer. */
+    private final WALRecordsConsumer<?, ?> consumer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Change Data Capture directory. */
+    private Path cdcDir;
+
+    /** Binary meta directory. */
+    private File binaryMeta;
+
+    /** Marshaller directory. */
+    private File marshaller;
+
+    /** Change Data Capture state. */
+    private ChangeDataCaptureConsumerState state;
+
+    /** Save state to start from. */
+    private WALPointer initState;
+
+    /** Stopped flag. */
+    private volatile boolean stopped;
+
+    /** Already processed segments. */
+    private final List<Path> processed = new ArrayList<>();
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param ctx Spring resource context.
+     * @param cdcCfg Change Data Capture configuration.
+     */
+    public ChangeDataCapture(
+        IgniteConfiguration igniteCfg,
+        GridSpringResourceContext ctx,
+        ChangeDataCaptureConfiguration cdcCfg) {
+        this.igniteCfg = new IgniteConfiguration(igniteCfg);
+        this.ctx = ctx;
+        this.cdcCfg = cdcCfg;
+
+        initWorkDir(this.igniteCfg);
+
+        log = logger(this.igniteCfg);
+
+        consumer = new WALRecordsConsumer<>(cdcCfg.getConsumer(), log);
+
+        factory = new IgniteWalIteratorFactory(log);
+    }
+
+    /** Runs Change Data Capture. */
+    @Override public void run() {
+        synchronized (this) {
+            if (stopped)
+                return;
+        }
+
+        try {
+            runX();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Runs Change Data Capture application with possible exception. */
+    public void runX() throws Exception {
+        if (!CU.isPersistenceEnabled(igniteCfg)) {
+            log.error(ERR_MSG);
+
+            throw new IllegalArgumentException(ERR_MSG);
+        }
+
+        PdsFolderSettings<ChangeDataCaptureFileLockHolder> settings =
+            new PdsFolderResolver<>(igniteCfg, log, null, 
this::tryLock).resolve();
+
+        if (settings == null)
+            throw new IgniteException("Can't find PDS folder!");
+
+        ChangeDataCaptureFileLockHolder lock = 
settings.getLockedFileLockHolder();
+
+        if (lock == null) {
+            File consIdDir = new File(settings.persistentStoreRootPath(), 
settings.folderName());
+
+            lock = tryLock(consIdDir);
+
+            if (lock == null)
+                throw new IgniteException("Can't lock Change Data Capture dir 
" + consIdDir.getAbsolutePath());
+        }
+
+        try {
+            String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
+
+            Files.createDirectories(cdcDir.resolve(STATE_DIR));
+
+            binaryMeta = 
CacheObjectBinaryProcessorImpl.binaryWorkDir(igniteCfg.getWorkDirectory(), 
consIdDir);
+
+            marshaller = 
MarshallerContextImpl.mappingFileStoreWorkDir(igniteCfg.getWorkDirectory());
+
+            injectResources(consumer.consumer());
+
+            ackAsciiLogo();
+
+            state = new 
ChangeDataCaptureConsumerState(cdcDir.resolve(STATE_DIR));
+
+            initState = state.load();
+
+            if (initState != null && log.isInfoEnabled())
+                log.info("Initial state loaded [state=" + initState + ']');
+
+            consumer.start();
+
+            try {
+                consumeWalSegmentsUntilStopped();
+            }
+            finally {
+                consumer.stop();
+
+                if (log.isInfoEnabled())
+                    log.info("Ignite Change Data Capture Application stoped.");
+            }
+        }
+        finally {
+            U.closeQuiet(lock);
+        }
+    }
+
+    /** Waits and consumes new WAL segments until stoped. */
+    public void consumeWalSegmentsUntilStopped() {
+        try {
+            Set<Path> seen = new HashSet<>();
+
+            AtomicLong lastSgmnt = new AtomicLong(-1);
+
+            while (!stopped) {
+                try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
+                    Set<Path> exists = new HashSet<>();
+
+                    cdcFiles
+                        .peek(exists::add) // Store files that exists in cdc 
dir.
+                        // Need unseen WAL segments only.
+                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
+                        .peek(seen::add) // Adds to seen.
+                        .sorted(Comparator.comparingLong(this::segmentIndex)) 
// Sort by segment index.
+                        .peek(p -> {
+                            long nextSgmnt = segmentIndex(p);
+
+                            assert lastSgmnt.get() == -1 || nextSgmnt - 
lastSgmnt.get() == 1;
+
+                            lastSgmnt.set(nextSgmnt);
+                        })
+                        .forEach(this::consumeSegment); // Consuming segments.
+
+                    seen.removeIf(p -> !exists.contains(p)); // Clean up seen 
set.
+                }
+
+                if (!stopped)
+                    U.sleep(cdcCfg.getCheckFrequency());
+            }
+        }
+        catch (IOException | IgniteInterruptedCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Reads all available records from segment. */
+    private void consumeSegment(Path segment) {
+        if (log.isInfoEnabled())
+            log.info("Processing WAL segment[segment=" + segment + ']');
+
+        IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .log(log)
+                .binaryMetadataFileStoreDir(binaryMeta)
+                .marshallerMappingFileStoreDir(marshaller)
+                .keepBinary(cdcCfg.isKeepBinary())
+                .filesOrDirs(segment.toFile())
+                .addFilter((type, ptr) -> type == DATA_RECORD_V2);
+
+        if (initState != null) {
+            long segmentIdx = segmentIndex(segment);
+
+            if (segmentIdx > initState.index()) {
+                log.error("Found segment greater then saved state. Some events 
are missed. Exiting!" +
+                    "[state=" + initState + ",segment=" + segmentIdx + ']');
+
+                throw new IgniteException("Some data missed.");
+            }
+
+            if (segmentIdx < initState.index()) {
+                if (log.isInfoEnabled()) {
+                    log.info("Already processed segment found. Skipping and 
deleting the file [segment=" +
+                        segmentIdx + ",state=" + initState.index() + ']');
+                }
+
+                // WAL segment is a hard link to a segment file in the special 
Change Data Capture folder.
+                // So, we can safely delete it after processing.
+                try {
+                    Files.delete(segment);
+
+                    return;
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            builder.from(initState);
+
+            initState = null;
+        }
+
+        try (WALIterator it = factory.iterator(builder)) {
+            while (it.hasNext()) {
+                boolean commit = consumer.onRecords(F.iterator(it.iterator(), 
IgniteBiTuple::get2, true));
+
+                if (commit) {
+                    assert it.lastRead().isPresent();
+
+                    state.save(it.lastRead().get());
+
+                    // Can delete after new file state save.
+                    if (!processed.isEmpty()) {
+                        // WAL segment is a hard link to a segment file in a 
specifal Change Data Capture folder.
+                        // So we can safely delete it after success processing.
+                        for (Path prevSegment : processed)
+                            Files.delete(prevSegment);
+
+                        processed.clear();
+                    }
+                }
+            }
+        } catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+
+        processed.add(segment);
+    }
+
+    /**
+     * Try locks Change Data Capture directory.
+     *
+     * @param dbStoreDirWithSubdirectory Root PDS directory.
+     * @return Lock or null if lock failed.
+     */
+    private ChangeDataCaptureFileLockHolder tryLock(File 
dbStoreDirWithSubdirectory) {
+        if (!dbStoreDirWithSubdirectory.exists()) {
+            log.warning(dbStoreDirWithSubdirectory + " not exists.");
+
+            return null;
+        }
+
+        File cdcRoot = new 
File(igniteCfg.getDataStorageConfiguration().getChangeDataCaptureWalPath());
+
+        if (!cdcRoot.isAbsolute()) {
+            cdcRoot = new File(
+                igniteCfg.getWorkDirectory(),
+                
igniteCfg.getDataStorageConfiguration().getChangeDataCaptureWalPath()
+            );
+        }
+
+        if (!cdcRoot.exists()) {
+            log.warning(cdcRoot + " not exists. Should be created by Ignite 
Node. " +
+                "Is Change Data Capture enabled in IgniteConfiguration?");
+
+            return null;
+        }
+
+        Path cdcDir = Paths.get(cdcRoot.getAbsolutePath(), 
dbStoreDirWithSubdirectory.getName());
+
+        if (!Files.exists(cdcDir)) {
+            log.warning(cdcRoot + " not exists. Should be create by Ignite 
Node. " +
+                "Is Change Data Capture enabled in IgniteConfiguration?");
+
+            return null;
+        }
+
+        this.cdcDir = cdcDir;
+
+        ChangeDataCaptureFileLockHolder lock = new 
ChangeDataCaptureFileLockHolder(cdcDir.toString(), "cdc.lock", log);
+
+        try {
+            lock.tryLock(cdcCfg.getLockTimeout());
+
+            return lock;
+        }
+        catch (IgniteCheckedException e) {
+            U.closeQuiet(lock);
+
+            if (log.isInfoEnabled())
+                log.info("Unable to acquire lock to file [" + cdcRoot + "], 
reason: " + e.getMessage());

Review comment:
       [data] is an incorrect output pattern

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/cdc/ChangeDataCapture.java
##########
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.ChangeDataCaptureConfiguration;
+import org.apache.ignite.cdc.ChangeDataCaptureConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureEvent;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.resource.GridResourceIoc;
+import 
org.apache.ignite.internal.processors.resource.GridResourceLoggerInjector;
+import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.resources.SpringApplicationContextResource;
+import org.apache.ignite.resources.SpringResource;
+import org.apache.ignite.startup.cmdline.ChangeDataCaptureCommandLineStartup;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+
+/**
+ * Change Data Capture(CDC) application.
+ * Application run independently of Ignite node process and provide ability 
for the {@link ChangeDataCaptureConsumer}
+ * to consume events({@link ChangeDataCaptureEvent}) from WAL segments.
+ * User should responsible {@link ChangeDataCaptureConsumer} implementation 
with custom consumption logic.
+ *
+ * Ignite node should be explicitly configured for using {@link 
ChangeDataCapture}.
+ * <ol>
+ *     <li>Set {@link 
DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)} to true.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setChangeDataCaptureWalPath(String)} to path to the 
directory
+ *     to store WAL segments for CDC.</li>
+ *     <li>Optional: Set {@link 
DataStorageConfiguration#setWalForceArchiveTimeout(long)} to configure timeout 
for
+ *     force WAL rollover, so new events will be available for consumptions 
with the predicted time.</li>
+ * </ol>
+ *
+ * When {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} is true 
then Ignite node on each WAL segment
+ * rollover creates hard link to archive WAL segment in
+ * {@link DataStorageConfiguration#getChangeDataCaptureWalPath()} directory. 
{@link ChangeDataCapture} application takes
+ * segment file and consumes events from it.
+ * After successful consumption (see {@link 
ChangeDataCaptureConsumer#onEvents(Iterator)}) WAL segment will be deleted
+ * from directory.
+ *
+ * Several Ignite nodes can be started on the same host.
+ * If your deployment done with custom consistent id then you should specify 
it via
+ * {@link IgniteConfiguration#setConsistentId(Serializable)} in provided 
{@link IgniteConfiguration}.
+ *
+ * Application works as follows:
+ * <ol>
+ *     <li>Search node work directory based on provided {@link 
IgniteConfiguration}.</li>
+ *     <li>Await for creation of CDC directory if it not exists.</li>
+ *     <li>Acquire file lock to ensure exclusive consumption.</li>
+ *     <li>Loads state of consumption if it exists.</li>
+ *     <li>Infinetely wait for new available segment and process it.</li>
+ * </ol>
+ *
+ * @see DataStorageConfiguration#setChangeDataCaptureEnabled(boolean)
+ * @see DataStorageConfiguration#setChangeDataCaptureWalPath(String)
+ * @see DataStorageConfiguration#setWalForceArchiveTimeout(long)
+ * @see ChangeDataCaptureCommandLineStartup
+ * @see ChangeDataCaptureConsumer
+ * @see DataStorageConfiguration#DFLT_WAL_CDC_PATH
+ */
+public class ChangeDataCapture implements Runnable {
+    /** */
+    public static final String ERR_MSG = "Persistence disabled. Capture Data 
Change can't run!";
+
+    /** State dir. */
+    public static final String STATE_DIR = "state";
+
+    /** Ignite configuration. */
+    private final IgniteConfiguration igniteCfg;
+
+    /** Spring resource context. */
+    private final GridSpringResourceContext ctx;
+
+    /** Change Data Capture configuration. */
+    private final ChangeDataCaptureConfiguration cdcCfg;
+
+    /** WAL iterator factory. */
+    private final IgniteWalIteratorFactory factory;
+
+    /** Events consumer. */
+    private final WALRecordsConsumer<?, ?> consumer;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Change Data Capture directory. */
+    private Path cdcDir;
+
+    /** Binary meta directory. */
+    private File binaryMeta;
+
+    /** Marshaller directory. */
+    private File marshaller;
+
+    /** Change Data Capture state. */
+    private ChangeDataCaptureConsumerState state;
+
+    /** Save state to start from. */
+    private WALPointer initState;
+
+    /** Stopped flag. */
+    private volatile boolean stopped;
+
+    /** Already processed segments. */
+    private final List<Path> processed = new ArrayList<>();
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param ctx Spring resource context.
+     * @param cdcCfg Change Data Capture configuration.
+     */
+    public ChangeDataCapture(
+        IgniteConfiguration igniteCfg,
+        GridSpringResourceContext ctx,
+        ChangeDataCaptureConfiguration cdcCfg) {
+        this.igniteCfg = new IgniteConfiguration(igniteCfg);
+        this.ctx = ctx;
+        this.cdcCfg = cdcCfg;
+
+        initWorkDir(this.igniteCfg);
+
+        log = logger(this.igniteCfg);
+
+        consumer = new WALRecordsConsumer<>(cdcCfg.getConsumer(), log);
+
+        factory = new IgniteWalIteratorFactory(log);
+    }
+
+    /** Runs Change Data Capture. */
+    @Override public void run() {
+        synchronized (this) {
+            if (stopped)
+                return;
+        }
+
+        try {
+            runX();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Runs Change Data Capture application with possible exception. */
+    public void runX() throws Exception {
+        if (!CU.isPersistenceEnabled(igniteCfg)) {
+            log.error(ERR_MSG);
+
+            throw new IllegalArgumentException(ERR_MSG);
+        }
+
+        PdsFolderSettings<ChangeDataCaptureFileLockHolder> settings =
+            new PdsFolderResolver<>(igniteCfg, log, null, 
this::tryLock).resolve();
+
+        if (settings == null)
+            throw new IgniteException("Can't find PDS folder!");
+
+        ChangeDataCaptureFileLockHolder lock = 
settings.getLockedFileLockHolder();
+
+        if (lock == null) {
+            File consIdDir = new File(settings.persistentStoreRootPath(), 
settings.folderName());
+
+            lock = tryLock(consIdDir);
+
+            if (lock == null)
+                throw new IgniteException("Can't lock Change Data Capture dir 
" + consIdDir.getAbsolutePath());
+        }
+
+        try {
+            String consIdDir = cdcDir.getName(cdcDir.getNameCount() - 
1).toString();
+
+            Files.createDirectories(cdcDir.resolve(STATE_DIR));
+
+            binaryMeta = 
CacheObjectBinaryProcessorImpl.binaryWorkDir(igniteCfg.getWorkDirectory(), 
consIdDir);
+
+            marshaller = 
MarshallerContextImpl.mappingFileStoreWorkDir(igniteCfg.getWorkDirectory());
+
+            injectResources(consumer.consumer());
+
+            ackAsciiLogo();
+
+            state = new 
ChangeDataCaptureConsumerState(cdcDir.resolve(STATE_DIR));
+
+            initState = state.load();
+
+            if (initState != null && log.isInfoEnabled())
+                log.info("Initial state loaded [state=" + initState + ']');
+
+            consumer.start();
+
+            try {
+                consumeWalSegmentsUntilStopped();
+            }
+            finally {
+                consumer.stop();
+
+                if (log.isInfoEnabled())
+                    log.info("Ignite Change Data Capture Application stoped.");
+            }
+        }
+        finally {
+            U.closeQuiet(lock);
+        }
+    }
+
+    /** Waits and consumes new WAL segments until stoped. */
+    public void consumeWalSegmentsUntilStopped() {
+        try {
+            Set<Path> seen = new HashSet<>();
+
+            AtomicLong lastSgmnt = new AtomicLong(-1);
+
+            while (!stopped) {
+                try (Stream<Path> cdcFiles = Files.walk(cdcDir, 1)) {
+                    Set<Path> exists = new HashSet<>();
+
+                    cdcFiles
+                        .peek(exists::add) // Store files that exists in cdc 
dir.
+                        // Need unseen WAL segments only.
+                        .filter(p -> 
WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
+                        .peek(seen::add) // Adds to seen.
+                        .sorted(Comparator.comparingLong(this::segmentIndex)) 
// Sort by segment index.
+                        .peek(p -> {
+                            long nextSgmnt = segmentIndex(p);
+
+                            assert lastSgmnt.get() == -1 || nextSgmnt - 
lastSgmnt.get() == 1;
+
+                            lastSgmnt.set(nextSgmnt);
+                        })
+                        .forEach(this::consumeSegment); // Consuming segments.
+
+                    seen.removeIf(p -> !exists.contains(p)); // Clean up seen 
set.
+                }
+
+                if (!stopped)
+                    U.sleep(cdcCfg.getCheckFrequency());
+            }
+        }
+        catch (IOException | IgniteInterruptedCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Reads all available records from segment. */
+    private void consumeSegment(Path segment) {
+        if (log.isInfoEnabled())
+            log.info("Processing WAL segment[segment=" + segment + ']');
+
+        IgniteWalIteratorFactory.IteratorParametersBuilder builder =
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .log(log)
+                .binaryMetadataFileStoreDir(binaryMeta)
+                .marshallerMappingFileStoreDir(marshaller)
+                .keepBinary(cdcCfg.isKeepBinary())
+                .filesOrDirs(segment.toFile())
+                .addFilter((type, ptr) -> type == DATA_RECORD_V2);
+
+        if (initState != null) {
+            long segmentIdx = segmentIndex(segment);
+
+            if (segmentIdx > initState.index()) {
+                log.error("Found segment greater then saved state. Some events 
are missed. Exiting!" +
+                    "[state=" + initState + ",segment=" + segmentIdx + ']');
+
+                throw new IgniteException("Some data missed.");
+            }
+
+            if (segmentIdx < initState.index()) {
+                if (log.isInfoEnabled()) {
+                    log.info("Already processed segment found. Skipping and 
deleting the file [segment=" +
+                        segmentIdx + ",state=" + initState.index() + ']');
+                }
+
+                // WAL segment is a hard link to a segment file in the special 
Change Data Capture folder.
+                // So, we can safely delete it after processing.
+                try {
+                    Files.delete(segment);
+
+                    return;
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+
+            builder.from(initState);
+
+            initState = null;
+        }
+
+        try (WALIterator it = factory.iterator(builder)) {
+            while (it.hasNext()) {
+                boolean commit = consumer.onRecords(F.iterator(it.iterator(), 
IgniteBiTuple::get2, true));
+
+                if (commit) {
+                    assert it.lastRead().isPresent();
+
+                    state.save(it.lastRead().get());
+
+                    // Can delete after new file state save.
+                    if (!processed.isEmpty()) {
+                        // WAL segment is a hard link to a segment file in a 
specifal Change Data Capture folder.
+                        // So we can safely delete it after success processing.
+                        for (Path prevSegment : processed)
+                            Files.delete(prevSegment);
+
+                        processed.clear();
+                    }
+                }
+            }
+        } catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+
+        processed.add(segment);
+    }
+
+    /**
+     * Try locks Change Data Capture directory.
+     *
+     * @param dbStoreDirWithSubdirectory Root PDS directory.
+     * @return Lock or null if lock failed.
+     */
+    private ChangeDataCaptureFileLockHolder tryLock(File 
dbStoreDirWithSubdirectory) {
+        if (!dbStoreDirWithSubdirectory.exists()) {
+            log.warning(dbStoreDirWithSubdirectory + " not exists.");
+
+            return null;
+        }
+
+        File cdcRoot = new 
File(igniteCfg.getDataStorageConfiguration().getChangeDataCaptureWalPath());
+
+        if (!cdcRoot.isAbsolute()) {
+            cdcRoot = new File(
+                igniteCfg.getWorkDirectory(),
+                
igniteCfg.getDataStorageConfiguration().getChangeDataCaptureWalPath()
+            );
+        }
+
+        if (!cdcRoot.exists()) {
+            log.warning(cdcRoot + " not exists. Should be created by Ignite 
Node. " +
+                "Is Change Data Capture enabled in IgniteConfiguration?");
+
+            return null;
+        }
+
+        Path cdcDir = Paths.get(cdcRoot.getAbsolutePath(), 
dbStoreDirWithSubdirectory.getName());
+
+        if (!Files.exists(cdcDir)) {
+            log.warning(cdcRoot + " not exists. Should be create by Ignite 
Node. " +
+                "Is Change Data Capture enabled in IgniteConfiguration?");
+
+            return null;
+        }
+
+        this.cdcDir = cdcDir;
+
+        ChangeDataCaptureFileLockHolder lock = new 
ChangeDataCaptureFileLockHolder(cdcDir.toString(), "cdc.lock", log);
+
+        try {
+            lock.tryLock(cdcCfg.getLockTimeout());
+
+            return lock;
+        }
+        catch (IgniteCheckedException e) {
+            U.closeQuiet(lock);
+
+            if (log.isInfoEnabled())
+                log.info("Unable to acquire lock to file [" + cdcRoot + "], 
reason: " + e.getMessage());
+
+            return null;
+        }
+    }
+
+    /**
+     * Initialize logger.
+     *
+     * @param cfg Configuration.
+     */
+    private static IgniteLogger logger(IgniteConfiguration cfg) {
+        try {
+            return IgnitionEx.IgniteNamedInstance.initLogger(

Review comment:
       Such a call looks odd to me.
   Can we relocate initLogger to U or similar? Seems it uses nothing from 
IgniteNamedInstance or IgnitionEx.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureOrderTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.TestCDCConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.User;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.plugin.AbstractCachePluginProvider;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.KEYS_CNT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.addAndWaitForConsumption;
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.cdcConfig;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.configuration.WALMode.FSYNC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class ChangeDataCaptureOrderTest extends GridCommonAbstractTest {
+    /** */
+    public static final String FOR_OTHER_DR_ID = "for-other-dr-id";
+
+    /** */
+    public static final byte OTHER_DR_ID = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        int segmentSz = 10 * 1024 * 1024;
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setChangeDataCaptureEnabled(true)
+            .setWalMode(FSYNC)
+            .setMaxWalArchiveSize(10 * segmentSz)
+            .setWalSegmentSize(segmentSz)
+            .setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)));
+
+        cfg.setPluginProviders(new AbstractTestPluginProvider() {
+            @Override public String name() {
+                return "ConflictResolverProvider";
+            }
+
+            @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+                if 
(!ctx.igniteCacheConfiguration().getName().equals(FOR_OTHER_DR_ID))
+                    return null;
+
+                return new AbstractCachePluginProvider() {
+                    @Override public @Nullable Object createComponent(Class 
cls) {
+                        if (cls != CacheConflictResolutionManager.class)
+                            return null;
+
+                        return new TestCacheConflictResolutionManager();
+                    }
+                };
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.beforeTest();
+    }
+
+    /** Simplest CDC test with usage of {@link 
IgniteInternalCache#putAllConflict(Map)}. */
+    @Test
+    public void testReadAllKeysWithOtherDc() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("ignite-conflict-resolver");
+
+        IgniteEx ign = startGrid(cfg);
+
+        byte drId = (byte)1;
+
+        ign.context().cache().context().versions().dataCenterId(drId);
+
+        ign.cluster().state(ACTIVE);
+
+        TestCDCConsumer cnsmr = new TestCDCConsumer();
+
+        ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, 
cdcConfig(cnsmr));
+
+        IgniteCache<Integer, User> cache = 
ign.getOrCreateCache(FOR_OTHER_DR_ID);
+
+        cnsmr.drId = 1;
+        cnsmr.otherDrId = OTHER_DR_ID;
+
+        addAndWaitForConsumption(cnsmr, cdc, cache, null, 
this::addConflictData, 0, KEYS_CNT * 2, getTestTimeout());
+    }
+
+    /** */
+    @Test
+    public void testOrderIncrease() throws Exception {
+        IgniteConfiguration cfg = getConfiguration("ignite-0");
+
+        IgniteEx ign = startGrid(cfg);
+
+        ign.cluster().state(ACTIVE);
+
+        AtomicLong updCntr = new AtomicLong(0);
+        int key = 42;
+
+        ChangeDataCaptureConsumer cnsmr = new ChangeDataCaptureConsumer() {
+            private long order = -1;
+
+            @Override public boolean onEvents(Iterator<ChangeDataCaptureEvent> 
evts) {
+                evts.forEachRemaining(evt -> {
+                    assertEquals(key, evt.key());
+
+                    assertTrue(evt.version().order() > order);
+
+                    order = evt.version().order();
+
+                    updCntr.incrementAndGet();
+                });
+
+                return true;
+            }
+
+            @Override public void start() {
+                // No-op.
+            }
+
+            @Override public void stop() {
+                // No-op.
+            }
+        };
+
+        ChangeDataCapture cdc = new ChangeDataCapture(cfg, null, 
cdcConfig(cnsmr));
+
+        IgniteCache<Integer, User> cache = ign.getOrCreateCache("my-cache");
+
+        IgniteInternalFuture<?> fut = runAsync(cdc);
+
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.put(key, new User("John Connor " + i, 42 + i, null));
+
+        assertTrue(waitForCondition(() -> updCntr.get() == KEYS_CNT, 
getTestTimeout()));
+
+        fut.cancel();
+    }
+
+    /** */
+    private void addConflictData(IgniteCache<Integer, User> cache, int from, 
int to) {
+        try {
+            IgniteEx ign = (IgniteEx)G.allGrids().get(0);
+
+            IgniteInternalCache intCache = ign.cachex(cache.getName());
+
+            Map<KeyCacheObject, GridCacheDrInfo> drMap = new HashMap<>();
+
+            for (int i = from; i < to; i++) {
+                byte[] bytes = new byte[1024];
+
+                ThreadLocalRandom.current().nextBytes(bytes);
+
+                KeyCacheObject key = new KeyCacheObjectImpl(i, null, 
intCache.affinity().partition(i));
+                CacheObject val =
+                    new CacheObjectImpl(new User("John Connor " + i, 42 + i, 
bytes), null);
+
+                val.prepareMarshal(intCache.context().cacheObjectContext());
+
+                drMap.put(key, new GridCacheDrInfo(val, new 
GridCacheVersion(1, i, 1, OTHER_DR_ID)));
+            }
+
+            intCache.putAllConflict(drMap);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    public static class TestCacheConflictResolutionManager<K, V> extends 
GridCacheManagerAdapter<K, V>
+        implements CacheConflictResolutionManager<K, V> {
+        @Override public CacheVersionConflictResolver conflictResolver() {
+            return new CacheVersionConflictResolver() {
+                @Override public <K, V> GridCacheVersionConflictContext<K, V> 
resolve(

Review comment:
       seems you have broken generics here.
   These aren't the K,V you're looking for :) 

##########
File path: 
modules/core/src/test/java/org/apache/ignite/cdc/ChangeDataCaptureOrderTest.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.TestCDCConsumer;
+import org.apache.ignite.cdc.ChangeDataCaptureSelfTest.User;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.ChangeDataCapture;
+import 
org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import 
org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.plugin.AbstractCachePluginProvider;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.KEYS_CNT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static 
org.apache.ignite.cdc.ChangeDataCaptureSelfTest.addAndWaitForConsumption;
+import static org.apache.ignite.cdc.ChangeDataCaptureSelfTest.cdcConfig;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.configuration.WALMode.FSYNC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class ChangeDataCaptureOrderTest extends GridCommonAbstractTest {
+    /** */
+    public static final String FOR_OTHER_DR_ID = "for-other-dr-id";
+
+    /** */
+    public static final byte OTHER_DR_ID = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        int segmentSz = 10 * 1024 * 1024;
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setChangeDataCaptureEnabled(true)
+            .setWalMode(FSYNC)

Review comment:
       CDC should work even if BACKGROUND is set?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to