http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java new file mode 100644 index 0000000..c952a48 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumMetricsReducer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.lang.IgniteReducer; +import org.jetbrains.annotations.Nullable; + +/** + * Vacuum metrics reducer. + */ +public class VacuumMetricsReducer implements IgniteReducer<VacuumMetrics, VacuumMetrics> { + /** */ + private static final long serialVersionUID = 7063457745963917386L; + + /** */ + private final VacuumMetrics m = new VacuumMetrics(); + + /** {@inheritDoc} */ + @Override public boolean collect(@Nullable VacuumMetrics metrics) { + assert metrics != null; + + m.addCleanupRowsCnt(metrics.cleanupRowsCount()); + m.addScannedRowsCount(metrics.scannedRowsCount()); + m.addSearchNanoTime(metrics.searchNanoTime()); + m.addCleanupNanoTime(metrics.cleanupNanoTime()); + + return true; + } + + /** {@inheritDoc} */ + @Override public VacuumMetrics reduce() { + return m; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java new file mode 100644 index 0000000..9a0d9e2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/VacuumTask.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc; + +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Task for cleaning sing partition. + */ +public class VacuumTask extends GridFutureAdapter<VacuumMetrics> { + /** */ + private final MvccSnapshot snapshot; + + /** */ + @GridToStringExclude + private final GridDhtLocalPartition part; + + /** + * @param snapshot Snapshot. + * @param part Partition to cleanup. + */ + VacuumTask(MvccSnapshot snapshot, GridDhtLocalPartition part) { + this.snapshot = snapshot; + this.part = part; + } + + /** + * @return Snapshot. + */ + public MvccSnapshot snapshot() { + return snapshot; + } + + /** + * @return Partition to cleanup. + */ + public GridDhtLocalPartition part() { + return part; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(VacuumTask.class, this, "partId", part.id()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java new file mode 100644 index 0000000..0156c53 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryCntr.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccAckRequestQueryCntr implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long cntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccAckRequestQueryCntr() { + // No-op. + } + + /** + * @param cntr Query counter. + */ + public MvccAckRequestQueryCntr(long cntr) { + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Counter. + */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccAckRequestQueryCntr.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 140; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccAckRequestQueryCntr.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java new file mode 100644 index 0000000..7771f4d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestQueryId.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccAckRequestQueryId implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryTrackerId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccAckRequestQueryId() { + // No-op. + } + + /** + * @param qryTrackerId Query tracker Id. + */ + public MvccAckRequestQueryId(long qryTrackerId) { + this.qryTrackerId = qryTrackerId; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Query tracker id. + */ + public long queryTrackerId() { + return qryTrackerId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("qryTrackerId", qryTrackerId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + qryTrackerId = reader.readLong("qryTrackerId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccAckRequestQueryId.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 145; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccAckRequestQueryId.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java new file mode 100644 index 0000000..69dfd25 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTx.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA; + +/** + * + */ +public class MvccAckRequestTx implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int SKIP_RESPONSE_FLAG_MASK = 0x01; + + /** */ + private long futId; + + /** */ + private long txCntr; + + /** */ + private byte flags; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccAckRequestTx() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txCntr Counter assigned to transaction. + */ + public MvccAckRequestTx(long futId, long txCntr) { + this.futId = futId; + this.txCntr = txCntr; + } + + /** + * @return Query counter. + */ + public long queryCounter() { + return MVCC_COUNTER_NA; + } + + /** + * @return Query tracker id. + */ + public long queryTrackerId() { + return MVCC_TRACKER_ID_NA; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return {@code True} if response message is not needed. + */ + public boolean skipResponse() { + return (flags & SKIP_RESPONSE_FLAG_MASK) != 0; + } + + /** + * @param val {@code True} if response message is not needed. + */ + public void skipResponse(boolean val) { + if (val) + flags |= SKIP_RESPONSE_FLAG_MASK; + else + flags &= ~SKIP_RESPONSE_FLAG_MASK; + } + + /** + * @return Counter assigned tp transaction. + */ + public long txCounter() { + return txCntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("txCntr", txCntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + txCntr = reader.readLong("txCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccAckRequestTx.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 137; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccAckRequestTx.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java new file mode 100644 index 0000000..99761c3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryCntr.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccAckRequestTxAndQueryCntr extends MvccAckRequestTx { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryCntr; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccAckRequestTxAndQueryCntr() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txCntr Counter assigned to transaction update. + * @param qryCntr Counter assigned for transaction reads. + */ + public MvccAckRequestTxAndQueryCntr(long futId, long txCntr, long qryCntr) { + super(futId, txCntr); + + this.qryCntr = qryCntr; + } + + /** {@inheritDoc} */ + @Override public long queryCounter() { + return qryCntr; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("qryCntr", qryCntr)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + qryCntr = reader.readLong("qryCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccAckRequestTxAndQueryCntr.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 146; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccAckRequestTxAndQueryCntr.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java new file mode 100644 index 0000000..89f09db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccAckRequestTxAndQueryId.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccAckRequestTxAndQueryId extends MvccAckRequestTx { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryTrackerId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccAckRequestTxAndQueryId() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txCntr Counter assigned to transaction update. + * @param qryTrackerId Query tracker id. + */ + public MvccAckRequestTxAndQueryId(long futId, long txCntr, long qryTrackerId) { + super(futId, txCntr); + + this.qryTrackerId = qryTrackerId; + } + + /** {@inheritDoc} */ + @Override public long queryTrackerId() { + return qryTrackerId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("qryTrackerId", qryTrackerId)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + qryTrackerId = reader.readLong("qryTrackerId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccAckRequestTxAndQueryId.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 147; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccAckRequestTxAndQueryId.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java new file mode 100644 index 0000000..4b78c24 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccActiveQueriesMessage.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccActiveQueriesMessage implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private GridLongList activeQrys; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccActiveQueriesMessage() { + // No-op. + } + + /** + * @param activeQrys Active queries. + */ + public MvccActiveQueriesMessage(GridLongList activeQrys) { + this.activeQrys = activeQrys; + } + + /** + * @return Active queries. + */ + @Nullable public GridLongList activeQueries() { + return activeQrys; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("activeQrys", activeQrys)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + activeQrys = reader.readMessage("activeQrys"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccActiveQueriesMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 149; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccActiveQueriesMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java new file mode 100644 index 0000000..72e4c52 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccFutureResponse.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccFutureResponse implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccFutureResponse() { + // No-op. + } + + /** + * @param futId Future ID. + */ + public MvccFutureResponse(long futId) { + this.futId = futId; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccFutureResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 138; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccFutureResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java new file mode 100644 index 0000000..6d8b3c4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccMessage.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Common interface for all MVCC-related messages. + */ +public interface MvccMessage extends Message { + /** + * @return {@code True} if should wait for coordinator initialization. + */ + public boolean waitForCoordinatorInit(); + + /** + * @return {@code True} if message should be processed from NIO thread. + */ + public boolean processedFromNioThread(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java new file mode 100644 index 0000000..75d33a7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccQuerySnapshotRequest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Request to get MVCC snapshot for a query. + */ +public class MvccQuerySnapshotRequest implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccQuerySnapshotRequest() { + // No-op. + } + + /** + * @param futId Future ID. + */ + public MvccQuerySnapshotRequest(long futId) { + this.futId = futId; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccQuerySnapshotRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 139; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccQuerySnapshotRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java new file mode 100644 index 0000000..196003c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccSnapshotResponse.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccSnapshotResponse implements MvccMessage, MvccSnapshot, MvccLongList { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** */ + private long crdVer; + + /** */ + private long cntr; + + /** */ + private int opCntr; + + /** */ + @GridDirectTransient + private int txsCnt; + + /** */ + private long[] txs; + + /** */ + private long cleanupVer; + + /** */ + @GridDirectTransient + private long tracking; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccSnapshotResponse() { + // No-op. + } + + /** + * @param futId Future ID. + * @param crdVer Coordinator version. + * @param cntr Counter. + * @param opCntr Operation counter. + * @param cleanupVer Cleanup version. + * @param tracking Tracking number. + */ + public void init(long futId, long crdVer, long cntr, int opCntr, long cleanupVer, long tracking) { + this.futId = futId; + this.crdVer = crdVer; + this.cntr = cntr; + this.opCntr = opCntr; + this.cleanupVer = cleanupVer; + this.tracking = tracking; + + if (txsCnt > 0 && txs.length > txsCnt) // truncate if necessary + txs = Arrays.copyOf(txs, txsCnt); + } + + /** + * @param txId Transaction counter. + */ + public void addTx(long txId) { + if (txs == null) + txs = new long[4]; + else if (txs.length == txsCnt) + txs = Arrays.copyOf(txs, txs.length << 1); + + txs[txsCnt++] = txId; + } + + /** {@inheritDoc} */ + @Override public int size() { + return txsCnt; + } + + /** {@inheritDoc} */ + @Override public long get(int i) { + return txs[i]; + } + + /** {@inheritDoc} */ + @Override public boolean contains(long val) { + for (int i = 0; i < txsCnt; i++) { + if (txs[i] == val) + return true; + } + + return false; + } + + /** + * @return Tracking counter. + */ + public long tracking() { + return tracking; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return false; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public long cleanupVersion() { + return cleanupVer; + } + + /** {@inheritDoc} */ + public long counter() { + return cntr; + } + + /** {@inheritDoc} */ + @Override public int operationCounter() { + return opCntr; + } + + /** {@inheritDoc} */ + @Override public void incrementOperationCounter() { + opCntr++; + } + + /** {@inheritDoc} */ + @Override public MvccLongList activeTransactions() { + return this; + } + + /** {@inheritDoc} */ + @Override public MvccSnapshot withoutActiveTransactions() { + if (txsCnt > 0) + return new MvccSnapshotWithoutTxs(crdVer, cntr, opCntr, cleanupVer); + + return this; + } + + /** {@inheritDoc} */ + @Override public long coordinatorVersion() { + return crdVer; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("cleanupVer", cleanupVer)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("cntr", cntr)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("crdVer", crdVer)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeInt("opCntr", opCntr)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeLongArray("txs", txs)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cleanupVer = reader.readLong("cleanupVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + cntr = reader.readLong("cntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + crdVer = reader.readLong("crdVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + opCntr = reader.readInt("opCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + txs = reader.readLongArray("txs"); + + if (!reader.isLastRead()) + return false; + + txsCnt = txs != null ? txs.length : 0; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccSnapshotResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 141; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 6; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccSnapshotResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java new file mode 100644 index 0000000..cd30eb8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccTxSnapshotRequest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Request to get MVCC snapshot for a new transaction. + */ +public class MvccTxSnapshotRequest implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** + * Required by {@link GridIoMessageFactory}. + */ + public MvccTxSnapshotRequest() { + // No-op. + } + + /** + * @param futId Future ID. + */ + public MvccTxSnapshotRequest(long futId) { + this.futId = futId; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(MvccTxSnapshotRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 136; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccTxSnapshotRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java new file mode 100644 index 0000000..ae57507 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccWaitTxsRequest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.msg; + +import java.nio.ByteBuffer; + +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class MvccWaitTxsRequest implements MvccMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long futId; + + /** */ + private GridLongList txs; + + /** + * + */ + public MvccWaitTxsRequest() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txs Transactions to wait for. + */ + public MvccWaitTxsRequest(long futId, GridLongList txs) { + assert txs != null && txs.size() > 0 : txs; + + this.futId = futId; + this.txs = txs; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return Transactions to wait for. + */ + public GridLongList transactions() { + return txs; + } + + /** {@inheritDoc} */ + @Override public boolean waitForCoordinatorInit() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean processedFromNioThread() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("txs", txs)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + txs = reader.readMessage("txs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(MvccWaitTxsRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 142; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccWaitTxsRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java new file mode 100644 index 0000000..92aff7b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxKey.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.txlog; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class TxKey { + /** */ + private final long major; + + /** */ + private final long minor; + + /** + * @param major Major version. + * @param minor Minor version + */ + public TxKey(long major, long minor) { + this.major = major; + this.minor = minor; + } + + /** + * @return Major version. + */ + public long major() { + return major; + } + + /** + * @return Minor version. + */ + public long minor() { + return minor; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || o.getClass() != TxKey.class) return false; + + TxKey txKey = (TxKey) o; + + return major == txKey.major && minor == txKey.minor; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = (int) (major ^ (major >>> 32)); + result = 31 * result + (int) (minor ^ (minor >>> 32)); + return result; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java new file mode 100644 index 0000000..905bfc4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.txlog; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl; +import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; +import org.apache.ignite.internal.util.IgniteTree; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA; + +/** + * + */ +public class TxLog implements DbCheckpointListener { + /** */ + public static final String TX_LOG_CACHE_NAME = "TxLog"; + + /** */ + public static final int TX_LOG_CACHE_ID = CU.cacheId(TX_LOG_CACHE_NAME); + + /** */ + private static final TxKey LOWEST = new TxKey(0, 0); + + /** */ + private final IgniteCacheDatabaseSharedManager mgr; + + /** */ + private ReuseListImpl reuseList; + + /** */ + private TxLogTree tree; + + /** */ + private ConcurrentMap<TxKey, Sync> keyMap = new ConcurrentHashMap<>(); + + /** + * + * @param ctx Kernal context. + * @param mgr Database shared manager. + */ + public TxLog(GridKernalContext ctx, IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException { + this.mgr = mgr; + + init(ctx); + } + + /** + * + * @param ctx Kernal context. + * @throws IgniteCheckedException If failed. + */ + private void init(GridKernalContext ctx) throws IgniteCheckedException { + if (CU.isPersistenceEnabled(ctx.config())) { + mgr.checkpointReadLock(); + + try { + IgniteWriteAheadLogManager wal = ctx.cache().context().wal(); + PageMemoryEx pageMemory = (PageMemoryEx)mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory(); + + long partMetaId = pageMemory.partitionMetaPageId(TX_LOG_CACHE_ID, 0); + long partMetaPage = pageMemory.acquirePage(TX_LOG_CACHE_ID, partMetaId); + + long treeRoot, reuseListRoot; + + boolean isNew = false; + + try { + long pageAddr = pageMemory.writeLock(TX_LOG_CACHE_ID, partMetaId, partMetaPage); + + try { + if (PageIO.getType(pageAddr) != PageIO.T_PART_META) { + // Initialize new page. + PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); + + io.initNewPage(pageAddr, partMetaId, pageMemory.pageSize()); + + treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, 0, PageMemory.FLAG_DATA); + reuseListRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, 0, PageMemory.FLAG_DATA); + + assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA; + assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA; + + io.setTreeRoot(pageAddr, treeRoot); + io.setReuseListRoot(pageAddr, reuseListRoot); + + if (PageHandler.isWalDeltaRecordNeeded(pageMemory, TX_LOG_CACHE_ID, partMetaId, partMetaPage, wal, null)) + wal.log(new MetaPageInitRecord( + TX_LOG_CACHE_ID, + partMetaId, + io.getType(), + io.getVersion(), + treeRoot, + reuseListRoot + )); + + isNew = true; + } + else { + PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); + + treeRoot = io.getTreeRoot(pageAddr); + reuseListRoot = io.getReuseListRoot(pageAddr); + + assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA : + U.hexLong(treeRoot) + ", part=" + 0 + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID; + assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA : + U.hexLong(reuseListRoot) + ", part=" + 0 + ", TX_LOG_CACHE_ID=" + TX_LOG_CACHE_ID; + } + } + finally { + pageMemory.writeUnlock(TX_LOG_CACHE_ID, partMetaId, partMetaPage, null, isNew); + } + } + finally { + pageMemory.releasePage(TX_LOG_CACHE_ID, partMetaId, partMetaPage); + } + + reuseList = new ReuseListImpl( + TX_LOG_CACHE_ID, + TX_LOG_CACHE_NAME, + pageMemory, + wal, + reuseListRoot, + isNew); + + tree = new TxLogTree(pageMemory, wal, treeRoot, reuseList, ctx.failure(), isNew); + + ((GridCacheDatabaseSharedManager)mgr).addCheckpointListener(this); + } + finally { + mgr.checkpointReadUnlock(); + } + } + else { + PageMemory pageMemory = mgr.dataRegion(TX_LOG_CACHE_NAME).pageMemory(); + ReuseList reuseList1 = mgr.reuseList(TX_LOG_CACHE_NAME); + + long treeRoot; + + if ((treeRoot = reuseList1.takeRecycledPage()) == 0L) + treeRoot = pageMemory.allocatePage(TX_LOG_CACHE_ID, INDEX_PARTITION, FLAG_IDX); + + tree = new TxLogTree(pageMemory, null, treeRoot, reuseList1, ctx.failure(), true); + } + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { + reuseList.saveMetadata(); + } + + /** + * + * @param major Major version. + * @param minor Minor version. + * @return Transaction state for given version. + * @throws IgniteCheckedException If failed. + */ + public byte get(long major, long minor) throws IgniteCheckedException { + return get(new TxKey(major, minor)); + } + + /** + * + * @param key Transaction key. + * @return Transaction state for given version. + * @throws IgniteCheckedException If failed. + */ + public byte get(TxKey key) throws IgniteCheckedException { + TxRow row = tree.findOne(key); + + return row == null ? TxState.NA : row.state(); + } + + /** + * + * @param key TxKey. + * @param state Transaction state for given version. + * @param primary Flag if this is a primary node. + * @throws IgniteCheckedException If failed. + */ + public void put(TxKey key, byte state, boolean primary) throws IgniteCheckedException { + Sync sync = syncObject(key); + + try { + mgr.checkpointReadLock(); + + try { + synchronized (sync) { + tree.invoke(key, null, new TxLogUpdateClosure(key.major(), key.minor(), state, primary)); + } + } + finally { + mgr.checkpointReadUnlock(); + } + } finally { + evict(key, sync); + } + } + + /** + * Removes all records less or equals to the given version. + * + * @param major Major version. + * @param minor Minor version. + * @throws IgniteCheckedException If failed. + */ + public void removeUntil(long major, long minor) throws IgniteCheckedException { + TraversingClosure clo = new TraversingClosure(major, minor); + + tree.iterate(LOWEST, clo, clo); + + if (clo.rows != null) { + for (TxKey row : clo.rows) { + remove(row); + } + } + } + + /** */ + private void remove(TxKey key) throws IgniteCheckedException { + Sync sync = syncObject(key); + + try { + mgr.checkpointReadLock(); + + try { + synchronized (sync) { + tree.removex(key); + } + } + finally { + mgr.checkpointReadUnlock(); + } + } finally { + evict(key, sync); + } + } + + /** */ + private Sync syncObject(TxKey key) { + Sync sync = keyMap.get(key); + + while (true) { + if (sync == null) { + Sync old = keyMap.putIfAbsent(key, sync = new Sync()); + + if (old == null) + return sync; + else + sync = old; + } + else { + int cntr = sync.counter; + + while (cntr > 0) { + if (sync.casCounter(cntr, cntr + 1)) + return sync; + + cntr = sync.counter; + } + + sync = keyMap.get(key); + } + } + } + + /** */ + private void evict(TxKey key, Sync sync) { + assert sync != null; + + int cntr = sync.counter; + + while (true) { + assert cntr > 0; + + if (!sync.casCounter(cntr, cntr - 1)) { + cntr = sync.counter; + + continue; + } + + if (cntr == 1) { + boolean removed = keyMap.remove(key, sync); + + assert removed; + } + + break; + } + } + + /** + * + */ + private static class TraversingClosure extends TxKey implements BPlusTree.TreeRowClosure<TxKey, TxRow> { + /** */ + private List<TxKey> rows; + + /** + * + * @param major Major version. + * @param minor Minor version. + */ + TraversingClosure(long major, long minor) { + super(major, minor); + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<TxKey, TxRow> tree, BPlusIO<TxKey> io, long pageAddr, + int idx) throws IgniteCheckedException { + + if (rows == null) + rows = new ArrayList<>(); + + TxLogIO logIO = (TxLogIO)io; + int offset = io.offset(idx); + + rows.add(new TxKey(logIO.getMajor(pageAddr, offset), logIO.getMinor(pageAddr, offset))); + + return true; + } + } + + /** */ + private static class Sync { + /** */ + private static final AtomicIntegerFieldUpdater<Sync> UPD = AtomicIntegerFieldUpdater.newUpdater(Sync.class, "counter"); + + /** */ + volatile int counter = 1; + + /** */ + boolean casCounter(int old, int upd) { + return UPD.compareAndSet(this, old, upd); + } + } + + /** + * TxLog update closure. + */ + private static final class TxLogUpdateClosure implements IgniteTree.InvokeClosure<TxRow> { + /** */ + private final long major; + + /** */ + private final long minor; + + /** */ + private final byte newState; + + /** */ + private final boolean primary; + + /** */ + private IgniteTree.OperationType treeOp; + + /** + * + * @param major Coordinator version. + * @param minor Counter. + * @param newState New Tx newState. + * @param primary Flag if this is primary node. + */ + TxLogUpdateClosure(long major, long minor, byte newState, boolean primary) { + assert major > MVCC_CRD_COUNTER_NA && minor > MVCC_COUNTER_NA && newState != TxState.NA; + this.major = major; + this.minor = minor; + this.newState = newState; + this.primary = primary; + } + + /** {@inheritDoc} */ + @Override public void call(@Nullable TxRow row) { + if (row == null) { + valid(); + + return; + } + + byte currState = row.state(); + + switch (currState) { + case TxState.NA: + checkNa(currState); + + break; + + case TxState.PREPARED: + checkPrepared(currState); + + break; + + case TxState.COMMITTED: + checkCommitted(currState); + + break; + + case TxState.ABORTED: + checkAborted(currState); + + break; + + default: + throw new IllegalStateException("Unknown tx state: " + currState); + } + } + + /** {@inheritDoc} */ + @Override public TxRow newRow() { + return treeOp == IgniteTree.OperationType.PUT ? new TxRow(major, minor, newState) : null; + } + + /** {@inheritDoc} */ + @Override public IgniteTree.OperationType operationType() { + return treeOp; + } + + /** + * Checks update possibility for {@code TxState.NA} tx status. + * + * @param currState Current tx state. + */ + private void checkNa(byte currState) { + switch (newState) { + case TxState.ABORTED: + case TxState.PREPARED: + valid(); + + break; + + case TxState.COMMITTED: + invalid(currState); // TODO IGNITE-8445 + + break; + + default: + invalid(currState); + } + } + + /** + * Checks update possibility for {@code TxState.PREPARED} status. + * + * @param currState Current tx state. + */ + private void checkPrepared(byte currState) { + switch (newState) { + case TxState.ABORTED: + case TxState.COMMITTED: + valid(); + + break; + + case TxState.PREPARED: + ignore(); + + break; + + default: + invalid(currState); + } + } + + /** + * Checks update possibility for {@code TxState.COMMITTED} status. + * + * @param currState Current tx state. + */ + private void checkCommitted(byte currState) { + switch (newState) { + case TxState.COMMITTED: + ignore(); + + break; + + case TxState.PREPARED: + if (primary) + ignore(); // In case when remote tx has updated the current state before. + else + invalid(currState); + + break; + + default: + invalid(currState); + } + } + + /** + * Checks update possibility for {@code TxState.ABORTED} status. + * + * @param currState Current tx state. + */ + private void checkAborted(byte currState) { + switch (newState) { + case TxState.ABORTED: + ignore(); + + break; + + case TxState.PREPARED: + if (primary) + ignore(); // In case when remote tx has updated the current state before. + else + invalid(currState); + + break; + + default: + invalid(currState); + } + } + + /** + * Action for valid tx status update. + */ + private void valid() { + assert treeOp == null; + + treeOp = IgniteTree.OperationType.PUT; + } + + /** + * Action for invalid tx status update. + */ + private void invalid(byte currState) { + assert treeOp == null; + + throw new IllegalStateException("Unexpected new transaction state. [currState=" + + currState + ", newState=" + newState + ", cntr=" + minor +']'); + } + + /** + * Action for ignoring tx status update. + */ + private void ignore() { + assert treeOp == null; + + treeOp = IgniteTree.OperationType.NOOP; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java new file mode 100644 index 0000000..e952b43 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogIO.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.mvcc.txlog; + +/** + * + */ +public interface TxLogIO { + /** + * @param pageAddr Page address. + * @param off Item offset. + * @param row Row to compare with. + * @return Comparision result. + */ + int compare(long pageAddr, int off, TxKey row); + + /** + * @param pageAddr Page address. + * @param off Item offset. + * @return Major version + */ + long getMajor(long pageAddr, int off); + + /** + * @param pageAddr Page address. + * @param off Item offset. + * @param major Major version + */ + void setMajor(long pageAddr, int off, long major); + + /** + * @param pageAddr Page address. + * @param off Item offset. + * @return Minor version. + */ + long getMinor(long pageAddr, int off); + + /** + * @param pageAddr Page address. + * @param off Item offset. + * @param minor Minor version. + */ + void setMinor(long pageAddr, int off, long minor); + + /** + * @param pageAddr Page address. + * @param off Item offset. + * @return Transaction state. + */ + byte getState(long pageAddr, int off); + + /** + * @param pageAddr Page address. + * @param off Item offset. + * @param state Transaction state. + */ + void setState(long pageAddr, int off, byte state); +}