IGNITE-6014 Added TX marker records. This closes #2578

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5377af2c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5377af2c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5377af2c

Branch: refs/heads/ignite-3478
Commit: 5377af2c24642944960b9953e15ac5badccb1a16
Parents: f9955fd
Author: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Authored: Mon Sep 18 16:45:30 2017 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Mon Sep 18 16:56:19 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   8 +
 .../managers/discovery/ConsistentIdMapper.java  | 101 ++++++++
 .../internal/pagemem/wal/record/TxRecord.java   | 136 ++++++----
 .../cache/distributed/dht/GridDhtTxRemote.java  |   5 +
 .../distributed/near/GridNearTxRemote.java      |   5 +
 .../wal/serializer/RecordV1Serializer.java      |  24 +-
 .../wal/serializer/TxRecordSerializer.java      | 228 +++++++++++++++++
 .../cache/transactions/IgniteTxAdapter.java     |  46 ++++
 .../cache/transactions/IgniteTxManager.java     |  14 +
 .../db/wal/IgniteWalRecoveryTest.java           | 256 +++++++++++++++++++
 .../db/wal/reader/IgniteWalReaderTest.java      |   4 +-
 .../db/wal/reader/MockWalIteratorFactory.java   |   1 +
 12 files changed, 768 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 39c19fb..ec79026 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -714,6 +714,14 @@ public final class IgniteSystemProperties {
             "IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD";
 
     /**
+     * If the property is set {@link 
org.apache.ignite.internal.pagemem.wal.record.TxRecord} records
+     * will be logged to WAL.
+     *
+     * Default value is {@code false}.
+     */
+    public static final String IGNITE_WAL_LOG_TX_RECORDS = 
"IGNITE_WAL_LOG_TX_RECORDS";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
new file mode 100644
index 0000000..c524331
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class is needed for map UUID to consistent id and vice versa.
+ */
+public class ConsistentIdMapper {
+    /** Discovery manager. */
+    private final GridDiscoveryManager discoveryManager;
+
+    /**
+     * Create an instance of mapper.
+     *
+     * @param discoveryManager Discovery manager.
+     */
+    public ConsistentIdMapper(GridDiscoveryManager discoveryManager) {
+        this.discoveryManager = discoveryManager;
+    }
+
+    /**
+     * Map UUID to consistent id.
+     *
+     * @param topVer Topology version.
+     * @param nodeId UUID of node.
+     * @return Consistent id of node.
+     */
+    public Object mapToConsistentId(AffinityTopologyVersion topVer, UUID 
nodeId) {
+        ClusterNode node = discoveryManager.node(topVer, nodeId);
+
+        if (node == null)
+            throw new IllegalStateException("Unable to find node by UUID 
[nodeId=" + nodeId + ", topVer=" + topVer + ']');
+
+        return node.consistentId();
+    }
+
+    /**
+     * Map consistent id to UUID.
+     *
+     * @param consistentId Consistent id of node.
+     * @return UUID of node.
+     */
+    @Nullable public UUID mapToUUID(Object consistentId) {
+        for (ClusterNode node : discoveryManager.allNodes())
+            if (node.consistentId().equals(consistentId))
+                return node.id();
+
+        return null;
+    }
+
+    /**
+     * Map primary -> backup node UUIDs to consistent ids.
+     *
+     * @param txNodes Primary -> backup UUID nodes.
+     * @return Primary -> backup consistent id nodes.
+     */
+    public Map<Object, Collection<Object>> 
mapToConsistentIds(AffinityTopologyVersion topVer, @Nullable Map<UUID, 
Collection<UUID>> txNodes) {
+        if (txNodes == null)
+            return null;
+
+        Map<Object, Collection<Object>> consistentMap = 
U.newHashMap(txNodes.keySet().size());
+
+        for (UUID node : txNodes.keySet()) {
+            Collection<UUID> backupNodes = txNodes.get(node);
+
+            Collection<Object> consistentIdsBackups = new 
ArrayList<>(backupNodes.size());
+
+            for (UUID backup : backupNodes)
+                consistentIdsBackups.add(mapToConsistentId(topVer, backup));
+
+            consistentMap.put(mapToConsistentId(topVer, node), 
consistentIdsBackups);
+        }
+
+        return consistentMap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
index 9bb747b..ce1e28e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
@@ -17,56 +17,61 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
-import java.util.UUID;
+import java.util.Collection;
+import java.util.Map;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Logical data record indented for transaction (tx) related actions.<br>
  * This record is marker of begin, prepare, commit, and rollback transactions.
  */
 public class TxRecord extends WALRecord {
-    /**
-     * Tx action enum.
-     */
-    public enum TxAction {
-        /** Transaction begin. */
-        BEGIN,
-
-        /** Transaction prepare. */
-        PREPARE,
-
-        /** Transaction commit. */
-        COMMIT,
-
-        /** Transaction rollback. */
-        ROLLBACK;
-
-        /** Available values. */
-        private static final TxAction[] VALS = TxAction.values();
-
-        /**
-         * Gets tx action value from ordinal.
-         *
-         * @param ord Ordinal.
-         * @return Value.
-         */
-        public static TxAction fromOrdinal(int ord) {
-            return ord < 0 || ord >= VALS.length ? null : VALS[ord];
-        }
-    }
+    /** Transaction state. */
+    private TransactionState state;
 
-    /** */
-    private TxAction action;
-
-    /** Global transaction identifier within cluster, assigned by transaction 
coordinator */
+    /** Global transaction identifier within cluster, assigned by transaction 
coordinator. */
     private GridCacheVersion nearXidVer;
 
-    /** */
-    private GridCacheVersion dhtVer;
+    /** Transaction entries write topology version. */
+    private GridCacheVersion writeVer;
+
+    /**
+     * Transaction participating nodes.
+     *
+     * Structure:
+     * Primary node -> [Backup nodes...]
+     **/
+    @Nullable private Map<Object, Collection<Object>> participatingNodes;
+
+    /** If transaction is remote, primary node for this backup node. */
+    @Nullable private Object primaryNode;
+
+    /** Timestamp of Tx state change. */
+    private long timestamp;
 
-    /** */
-    private UUID[] participatingNodeIds;
+    /**
+     *
+     * @param state Transaction state.
+     * @param nearXidVer Transaction id.
+     * @param writeVer Transaction entries write topology version.
+     * @param participatingNodes Primary -> Backup nodes participating in 
transaction.
+     */
+    public TxRecord(TransactionState state,
+                    GridCacheVersion nearXidVer,
+                    GridCacheVersion writeVer,
+                    @Nullable Map<Object, Collection<Object>> 
participatingNodes,
+                    @Nullable Object primaryNode,
+                    long timestamp) {
+        this.state = state;
+        this.nearXidVer = nearXidVer;
+        this.writeVer = writeVer;
+        this.participatingNodes = participatingNodes;
+        this.primaryNode = primaryNode;
+        this.timestamp = timestamp;
+    }
 
     /** {@inheritDoc} */
     @Override public RecordType type() {
@@ -90,43 +95,64 @@ public class TxRecord extends WALRecord {
     /**
      * @return DHT version.
      */
-    public GridCacheVersion dhtVersion() {
-        return dhtVer;
+    public GridCacheVersion writeVersion() {
+        return writeVer;
+    }
+
+    /**
+     * @param writeVer DHT version.
+     */
+    public void dhtVersion(GridCacheVersion writeVer) {
+        this.writeVer = writeVer;
+    }
+
+    /**
+     * @return Transaction state.
+     */
+    public TransactionState state() {
+        return state;
+    }
+
+    /**
+     * @param state Transaction state.
+     */
+    public void state(TransactionState state) {
+        this.state = state;
     }
 
     /**
-     * @param dhtVer DHT version.
+     * @return Primary -> backup participating nodes.
      */
-    public void dhtVersion(GridCacheVersion dhtVer) {
-        this.dhtVer = dhtVer;
+    public Map<Object, Collection<Object>> participatingNodes() {
+        return participatingNodes;
     }
 
     /**
-     * @return Action.
+     * @param participatingNodeIds Primary -> backup participating nodes.
      */
-    public TxAction action() {
-        return action;
+    public void participatingNodes(Map<Object, Collection<Object>> 
participatingNodeIds) {
+        this.participatingNodes = participatingNodeIds;
     }
 
     /**
-     * @param action Action.
+     * @return Is transaction remote for backup.
      */
-    public void action(TxAction action) {
-        this.action = action;
+    public boolean remote() {
+        return primaryNode != null;
     }
 
     /**
-     * @param participatingNodeIds Participating node IDs.
+     * @return Primary node for backup if transaction is remote.
      */
-    public void participatingNodeIds(UUID[] participatingNodeIds) {
-        this.participatingNodeIds = participatingNodeIds;
+    @Nullable public Object primaryNode() {
+        return primaryNode;
     }
 
     /**
-     * @return Participating node IDs.
+     * @return Timestamp of Tx state change in millis.
      */
-    public UUID[] participatingNodeId() {
-        return participatingNodeIds;
+    public long timestamp() {
+        return timestamp;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 4373cda..746eb38 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -229,6 +229,11 @@ public class GridDhtTxRemote extends 
GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean remote() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean dht() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index e5cd469..5477af9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -217,6 +217,11 @@ public class GridNearTxRemote extends 
GridDistributedTxRemoteAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean remote() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean near() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index b78e2e3..ce6fdc7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.LazyDataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
@@ -83,6 +84,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
@@ -93,7 +95,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.RecordSeriali
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.F;
@@ -139,6 +140,9 @@ public class RecordV1Serializer implements RecordSerializer 
{
     /** Write pointer. */
     private final boolean writePointer;
 
+    /** Serializer of {@link TxRecord} records. */
+    private TxRecordSerializer txRecordSerializer;
+
     /**
      * @param cctx Cache shared context.
      */
@@ -155,6 +159,7 @@ public class RecordV1Serializer implements RecordSerializer 
{
 
         co = cctx.kernalContext().cacheObjects();
         pageSize = cctx.database().pageSize();
+        txRecordSerializer = new TxRecordSerializer(cctx);
     }
 
     /** {@inheritDoc} */
@@ -661,6 +666,11 @@ public class RecordV1Serializer implements 
RecordSerializer {
 
                 break;
 
+            case TX_RECORD:
+                txRecordSerializer.writeTxRecord((TxRecord) record, buf);
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + 
record.type());
         }
@@ -1246,6 +1256,11 @@ public class RecordV1Serializer implements 
RecordSerializer {
             case SWITCH_SEGMENT_RECORD:
                 throw new EOFException("END OF SEGMENT");
 
+            case TX_RECORD:
+                res = txRecordSerializer.readTxRecord(in);
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + recType);
         }
@@ -1417,6 +1432,9 @@ public class RecordV1Serializer implements 
RecordSerializer {
             case SWITCH_SEGMENT_RECORD:
                 return commonFields - CRC_SIZE; //CRC is not loaded for switch 
segment, exception is thrown instead
 
+            case TX_RECORD:
+                return commonFields + 
txRecordSerializer.sizeOfTxRecord((TxRecord) record);
+
             default:
                 throw new UnsupportedOperationException("Type: " + 
record.type());
         }
@@ -1659,7 +1677,7 @@ public class RecordV1Serializer implements 
RecordSerializer {
      * @param ver Version to write.
      * @param allowNull Is {@code null}version allowed.
      */
-    private void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean 
allowNull) {
+    static void putVersion(ByteBuffer buf, GridCacheVersion ver, boolean 
allowNull) {
         CacheVersionIO.write(buf, ver, allowNull);
     }
 
@@ -1670,7 +1688,7 @@ public class RecordV1Serializer implements 
RecordSerializer {
      * @param allowNull Is {@code null}version allowed.
      * @return Read cache version.
      */
-    private GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean 
allowNull) throws IOException {
+    static GridCacheVersion readVersion(ByteBufferBackedDataInput in, boolean 
allowNull) throws IOException {
         // To be able to read serialization protocol version.
         in.ensure(1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
new file mode 100644
index 0000000..448bdbc
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/TxRecordSerializer.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.TransactionState;
+
+/**
+ * {@link TxRecord} WAL serializer.
+ */
+public class TxRecordSerializer {
+    /** Cache shared context. */
+    private GridCacheSharedContext cctx;
+
+    /** Class loader to unmarshal consistent ids. */
+    private ClassLoader classLoader;
+
+    /**
+     * Create an instance of serializer.
+     *
+     * @param cctx Cache shared context.
+     */
+    public TxRecordSerializer(GridCacheSharedContext cctx) {
+        this.cctx = cctx;
+
+        classLoader = U.resolveClassLoader(cctx.gridConfig());
+    }
+
+    /**
+     * Writes {@link TxRecord} to given buffer.
+     *
+     * @param record TxRecord.
+     * @param buf Byte buffer.
+     * @throws IgniteCheckedException In case of fail.
+     */
+    public void writeTxRecord(TxRecord record, ByteBuffer buf) throws 
IgniteCheckedException {
+        buf.put((byte) record.state().ordinal());
+        RecordV1Serializer.putVersion(buf, record.nearXidVersion(), true);
+        RecordV1Serializer.putVersion(buf, record.writeVersion(), true);
+
+        if (record.participatingNodes() != null) {
+            buf.putInt(record.participatingNodes().keySet().size());
+
+            for (Object primaryNode : record.participatingNodes().keySet()) {
+                writeConsistentId(primaryNode, buf);
+
+                Collection<Object> backupNodes = 
record.participatingNodes().get(primaryNode);
+
+                buf.putInt(backupNodes.size());
+
+                for (Object backupNode : backupNodes) {
+                    writeConsistentId(backupNode, buf);
+                }
+            }
+        }
+        else {
+            // Put zero size of participating nodes.
+            buf.putInt(0);
+        }
+
+        buf.put((byte) (record.remote() ? 1 : 0));
+
+        if (record.remote())
+            writeConsistentId(record.primaryNode(), buf);
+
+        buf.putLong(record.timestamp());
+    }
+
+    /**
+     * Reads {@link TxRecord} from given input.
+     *
+     * @param in Input
+     * @return TxRecord.
+     * @throws IOException In case of fail.
+     * @throws IgniteCheckedException In case of fail.
+     */
+    public TxRecord readTxRecord(ByteBufferBackedDataInput in) throws 
IOException, IgniteCheckedException {
+        byte txState = in.readByte();
+        TransactionState state = TransactionState.fromOrdinal(txState);
+
+        GridCacheVersion nearXidVer = RecordV1Serializer.readVersion(in, true);
+        GridCacheVersion writeVer = RecordV1Serializer.readVersion(in, true);
+
+        int participatingNodesSize = in.readInt();
+        Map<Object, Collection<Object>> participatingNodes = new HashMap<>(2 * 
participatingNodesSize);
+
+        for (int i = 0; i < participatingNodesSize; i++) {
+            Object primaryNode = readConsistentId(in);
+
+            int backupNodesSize = in.readInt();
+
+            Collection<Object> backupNodes = new ArrayList<>(backupNodesSize);
+
+            for (int j = 0; j < backupNodesSize; j++) {
+                Object backupNode = readConsistentId(in);
+
+                backupNodes.add(backupNode);
+            }
+
+            participatingNodes.put(primaryNode, backupNodes);
+        }
+
+        boolean hasRemote = in.readByte() == 1;
+
+        Object primaryNode = null;
+
+        if (hasRemote)
+            primaryNode = readConsistentId(in);
+
+        long timestamp = in.readLong();
+
+        return new TxRecord(state, nearXidVer, writeVer, participatingNodes, 
primaryNode, timestamp);
+    }
+
+    /**
+     * Returns size of marshalled {@link TxRecord} in bytes.
+     *
+     * @param record TxRecord.
+     * @return Size of TxRecord in bytes.
+     * @throws IgniteCheckedException In case of fail.
+     */
+    public int sizeOfTxRecord(TxRecord record) throws IgniteCheckedException {
+        int size = 0;
+
+        size += /* transaction state. */ 1;
+        size += CacheVersionIO.size(record.nearXidVersion(), true);
+        size += CacheVersionIO.size(record.writeVersion(), true);
+
+        size += /* primary nodes count. */ 4;
+
+        if (record.participatingNodes() != null) {
+            for (Object primaryNode : record.participatingNodes().keySet()) {
+                size += /* byte array length. */ 4;
+                size += marshalConsistentId(primaryNode).length;
+
+                Collection<Object> backupNodes = 
record.participatingNodes().get(primaryNode);
+
+                size += /* size of backup nodes. */ 4;
+
+                for (Object backupNode : backupNodes) {
+                    size += /* byte array length. */ 4;
+                    size += marshalConsistentId(backupNode).length;
+                }
+            }
+        }
+
+        size += /* Is primary node exist. */ 1;
+
+        if (record.remote()) {
+            size += /* byte array length. */ 4;
+            size += marshalConsistentId(record.primaryNode()).length;
+        }
+
+        size += /* Timestamp */ 8;
+
+        return size;
+    }
+
+    /**
+     * Marshal consistent id to byte array.
+     *
+     * @param consistentId Consistent id.
+     * @return Marshalled byte array.
+     * @throws IgniteCheckedException In case of fail.
+     */
+    private byte[] marshalConsistentId(Object consistentId) throws 
IgniteCheckedException {
+        return cctx.marshaller().marshal(consistentId);
+    }
+
+    /**
+     * Read consistent id from given input.
+     *
+     * @param in Input.
+     * @return Consistent id.
+     * @throws IOException In case of fail.
+     * @throws IgniteCheckedException In case of fail.
+     */
+    private Object readConsistentId(ByteBufferBackedDataInput in) throws 
IOException, IgniteCheckedException {
+        int len = in.readInt();
+        in.ensure(len);
+
+        byte[] content = new byte[len];
+        in.readFully(content);
+
+        return cctx.marshaller().unmarshal(content, classLoader);
+    }
+
+    /**
+     * Write consistent id to given buffer.
+     *
+     * @param consistentId Consistent id.
+     * @param buf Byte buffer.
+     * @throws IgniteCheckedException In case of fail.
+     */
+    private void writeConsistentId(Object consistentId, ByteBuffer buf) throws 
IgniteCheckedException {
+        byte[] content = marshalConsistentId(consistentId);
+
+        buf.putInt(content.length);
+        buf.put(content);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 4d85db5..c447436 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -39,10 +39,13 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.ConsistentIdMapper;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
 import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
@@ -94,6 +97,7 @@ import static 
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED
 import static 
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
 import static org.apache.ignite.transactions.TransactionState.ACTIVE;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
 import static org.apache.ignite.transactions.TransactionState.PREPARED;
@@ -245,6 +249,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     /** Store used flag. */
     protected boolean storeEnabled = true;
 
+    /** UUID to consistent id mapper. */
+    protected ConsistentIdMapper consistentIdMapper;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -308,6 +315,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, this);
+
+        consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
     }
 
     /**
@@ -357,6 +366,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, this);
+
+        consistentIdMapper = new ConsistentIdMapper(cctx.discovery());
     }
 
     /** {@inheritDoc} */
@@ -570,6 +581,13 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         return log;
     }
 
+    /**
+     * @return True if transaction reflects changes in primary -> backup 
direction.
+     */
+    public boolean remote() {
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean near() {
         return false;
@@ -1075,6 +1093,34 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
             // Seal transactions maps.
             if (state != ACTIVE && state != SUSPENDED)
                 seal();
+
+            if (cctx.wal() != null && cctx.tm().logTxRecords()) {
+                // Log tx state change to WAL.
+                if (state == PREPARED || state == COMMITTED || state == 
ROLLED_BACK) {
+                    assert txNodes != null || state == ROLLED_BACK;
+
+                    Map<Object, Collection<Object>> participatingNodes = 
consistentIdMapper
+                        .mapToConsistentIds(topVer, txNodes);
+
+                    TxRecord txRecord = new TxRecord(
+                            state,
+                            nearXidVersion(),
+                            writeVersion(),
+                            participatingNodes,
+                            remote() ? nodeId() : null,
+                            U.currentTimeMillis()
+                    );
+
+                    try {
+                        cctx.wal().log(txRecord);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to log TxRecord: " + txRecord, e);
+
+                        throw new IgniteException("Failed to log TxRecord: " + 
txRecord, e);
+                    }
+                }
+            }
         }
 
         return valid;

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 474b484..9a8280f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -98,6 +99,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_C
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
@@ -202,6 +204,9 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     /** TxDeadlock detection. */
     private TxDeadlockDetection txDeadlockDetection;
 
+    /** Flag indicates that {@link TxRecord} records will be logged to WAL. */
+    private boolean logTxRecords;
+
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         cctx.gridIO().removeMessageListener(TOPIC_TX);
@@ -276,6 +281,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
         this.txDeadlockDetection = new TxDeadlockDetection(cctx);
 
         cctx.gridIO().addMessageListener(TOPIC_TX, new 
DeadlockDetectionListener());
+
+        this.logTxRecords = 
IgniteSystemProperties.getBoolean(IGNITE_WAL_LOG_TX_RECORDS, false);
     }
 
     /**
@@ -2315,6 +2322,13 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return True if {@link TxRecord} records should be logged to WAL.
+     */
+    public boolean logTxRecords() {
+        return logTxRecords;
+    }
+
+    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends 
GridTimeoutObjectAdapter {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
index 399e36d..718a9a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRecoveryTest.java
@@ -22,9 +22,12 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -33,8 +36,11 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -53,14 +59,18 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
 import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
+import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.PAX;
@@ -75,6 +85,9 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.junit.Assert;
 import sun.nio.ch.DirectBuffer;
 
@@ -1014,6 +1027,249 @@ public class IgniteWalRecoveryTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Test recovery from WAL on 3 nodes in case of transactional cache.
+     *
+     * @throws Exception If fail.
+     */
+    public void testRecoveryOnTransactionalAndPartitionedCache() throws 
Exception {
+        IgniteEx ignite = (IgniteEx) startGrids(3);
+        ignite.active(true);
+
+        try {
+            final String cacheName = "transactional";
+
+            CacheConfiguration<Object, Object> cacheConfiguration = new 
CacheConfiguration<>(cacheName)
+                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                    .setAffinity(new RendezvousAffinityFunction(false, 32))
+                    .setCacheMode(CacheMode.PARTITIONED)
+                    .setRebalanceMode(CacheRebalanceMode.SYNC)
+                    
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                    .setBackups(2);
+
+            ignite.createCache(cacheConfiguration);
+
+            IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+            Map<Object, Object> map = new HashMap<>();
+
+            final int transactions = 100;
+            final int operationsPerTransaction = 40;
+
+            Random random = new Random();
+
+            for (int t = 1; t <= transactions; t++) {
+                Transaction tx = ignite.transactions().txStart(
+                        TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+
+                Map<Object, Object> changesInTransaction = new HashMap<>();
+
+                for (int op = 0; op < operationsPerTransaction; op++) {
+                    int key = random.nextInt(1000) + 1;
+
+                    Object value;
+                    if (random.nextBoolean())
+                        value = randomString(random) + key;
+                    else
+                        value = new BigObject(key);
+
+                    changesInTransaction.put(key, value);
+
+                    cache.put(key, value);
+                }
+
+                if (random.nextBoolean()) {
+                    tx.commit();
+                    map.putAll(changesInTransaction);
+                }
+                else {
+                    tx.rollback();
+                }
+
+                if (t % 50 == 0)
+                    log.info("Finished transaction " + t);
+            }
+
+            stopAllGrids();
+
+            ignite = (IgniteEx) startGrids(3);
+            ignite.active(true);
+
+            cache = ignite.cache(cacheName);
+
+            for (Object key : map.keySet()) {
+                Object expectedValue = map.get(key);
+                Object actualValue = cache.get(key);
+                Assert.assertEquals("Unexpected value for key " + key, 
expectedValue, actualValue);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Test that all DataRecord WAL records are within transaction boundaries 
- PREPARED and COMMITTED markers.
+     *
+     * @throws Exception If any fail.
+     */
+    public void testTxRecordsConsistency() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, 
"true");
+
+        IgniteEx ignite = (IgniteEx) startGrids(3);
+        ignite.active(true);
+
+        try {
+            final String cacheName = "transactional";
+
+            CacheConfiguration<Object, Object> cacheConfiguration = new 
CacheConfiguration<>(cacheName)
+                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                    .setAffinity(new RendezvousAffinityFunction(false, 32))
+                    .setCacheMode(CacheMode.PARTITIONED)
+                    .setRebalanceMode(CacheRebalanceMode.SYNC)
+                    
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                    .setBackups(0);
+
+            ignite.createCache(cacheConfiguration);
+
+            IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+            GridCacheSharedContext<Object, Object> sharedCtx = 
ignite.context().cache().context();
+
+            GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)sharedCtx.database();
+
+            db.waitForCheckpoint("test");
+            db.enableCheckpoints(false).get();
+
+            // Log something to know where to start.
+            WALPointer startPtr = sharedCtx.wal().log(new 
MemoryRecoveryRecord(U.currentTimeMillis()));
+
+            final int transactions = 100;
+            final int operationsPerTransaction = 40;
+
+            Random random = new Random();
+
+            for (int t = 1; t <= transactions; t++) {
+                Transaction tx = ignite.transactions().txStart(
+                        TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.READ_COMMITTED);
+
+                for (int op = 0; op < operationsPerTransaction; op++) {
+                    int key = random.nextInt(1000) + 1;
+
+                    Object value;
+                    if (random.nextBoolean())
+                        value = randomString(random) + key;
+                    else
+                        value = new BigObject(key);
+
+                    cache.put(key, value);
+                }
+
+                if (random.nextBoolean()) {
+                    tx.commit();
+                }
+                else {
+                    tx.rollback();
+                }
+
+                if (t % 50 == 0)
+                    log.info("Finished transaction " + t);
+            }
+
+            Set<GridCacheVersion> activeTransactions = new HashSet<>();
+
+            // Check that all DataRecords are within PREPARED and COMMITTED tx 
records.
+            try (WALIterator it = sharedCtx.wal().replay(startPtr)) {
+                while (it.hasNext()) {
+                    IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
+
+                    WALRecord rec = tup.get2();
+
+                    if (rec instanceof TxRecord) {
+                        TxRecord txRecord = (TxRecord) rec;
+                        GridCacheVersion txId = txRecord.nearXidVersion();
+
+                        switch (txRecord.state()) {
+                            case PREPARED:
+                                assert !activeTransactions.contains(txId) : 
"Transaction is already present " + txRecord;
+
+                                activeTransactions.add(txId);
+
+                                break;
+                            case COMMITTED:
+                                assert activeTransactions.contains(txId) : "No 
PREPARE marker for transaction " + txRecord;
+
+                                activeTransactions.remove(txId);
+
+                                break;
+                            case ROLLED_BACK:
+                                activeTransactions.remove(txId);
+                                break;
+
+                            default:
+                                throw new IllegalStateException("Unknown Tx 
state of record " + txRecord);
+                        }
+                    } else if (rec instanceof DataRecord) {
+                        DataRecord dataRecord = (DataRecord) rec;
+
+                        for (DataEntry entry : dataRecord.writeEntries()) {
+                            GridCacheVersion txId = entry.nearXidVersion();
+
+                            assert activeTransactions.contains(txId) : "No 
transaction for entry " + entry;
+                        }
+                    }
+                }
+            }
+        }
+        finally {
+            
System.clearProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS);
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Generate random lowercase string for test purposes.
+     */
+    private String randomString(Random random) {
+        int len = random.nextInt(50) + 1;
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < len; i++)
+            sb.append(random.nextInt(26) + 'a');
+
+        return sb.toString();
+    }
+
+    /**
+     * BigObject for test purposes that don't fit in page size.
+     */
+    private static class BigObject {
+        private final int index;
+
+        private final byte[] payload = new byte[4096];
+
+        BigObject(int index) {
+            this.index = index;
+            // Create pseudo-random array.
+            for (int i = 0; i < payload.length; i++)
+                if (i % index == 0)
+                    payload[i] = (byte) index;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            BigObject bigObject = (BigObject) o;
+            return index == bigObject.index &&
+                    Arrays.equals(payload, bigObject.payload);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(index, payload);
+        }
+    }
+
+    /**
      * @param size Size of data.
      * @return Test data.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
index 10e637b..ebb80a1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -105,7 +105,7 @@ public class IgniteWalReaderTest extends 
GridCommonAbstractTest {
      * Field for transferring setting from test to getConfig method
      * Archive incomplete segment after inactivity milliseconds.
      */
-    private int archiveIncompleteSegmentAfterInactivityMs = 0;
+    private int archiveIncompleteSegmentAfterInactivityMs;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
@@ -791,7 +791,7 @@ public class IgniteWalReaderTest extends 
GridCommonAbstractTest {
                     final TxRecord txRecord = (TxRecord)walRecord;
                     final GridCacheVersion globalTxId = 
txRecord.nearXidVersion();
 
-                    log.info("//Tx Record, action: " + txRecord.action() +
+                    log.info("//Tx Record, state: " + txRecord.state() +
                         "; nearTxVersion" + globalTxId);
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5377af2c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
index f90ae37..4030e53 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
@@ -104,6 +104,7 @@ public class MockWalIteratorFactory {
 
         when(sctx.kernalContext()).thenReturn(ctx);
         when(sctx.discovery()).thenReturn(disco);
+        when(sctx.gridConfig()).thenReturn(cfg);
 
         final GridCacheDatabaseSharedManager database = 
Mockito.mock(GridCacheDatabaseSharedManager.class);
 

Reply via email to