alex-plekhanov commented on a change in pull request #7941:
URL: https://github.com/apache/ignite/pull/7941#discussion_r494384542
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
##########
@@ -821,11 +1181,57 @@ private void writeKeysToMetaStore(boolean writeAll)
throws IgniteCheckedExceptio
if (writeAll)
metaStorage.write(MASTER_KEY_NAME_PREFIX,
getSpi().getMasterKeyName());
- for (Map.Entry<Integer, Serializable> entry : grpEncKeys.entrySet()) {
- if (!writeAll && metaStorage.read(ENCRYPTION_KEY_PREFIX +
entry.getKey()) != null)
+ if (!reencryptGroupsForced.isEmpty())
+ writeTrackedWalIdxsToMetaStore();
+
+ for (Integer grpId : grpKeys.groupIds()) {
+ if (!writeAll && !reencryptGroupsForced.containsKey(grpId) &&
+ metaStorage.read(ENCRYPTION_KEYS_PREFIX + grpId) != null)
continue;
- writeToMetaStore(entry.getKey(),
getSpi().encryptKey(entry.getValue()));
+ writeGroupKeysToMetaStore(grpId);
+ }
+ }
+
+ /**
+ * Writes cache group encryption keys to metastore.
+ *
+ * @param grpId Cache group ID.
+ */
+ private void writeGroupKeysToMetaStore(int grpId) throws
IgniteCheckedException {
+ assert Thread.holdsLock(metaStorageMux);
+
+ if (metaStorage == null || !writeToMetaStoreEnabled || stopped)
+ return;
+
+ List<GroupKeyEncrypted> keysEncrypted = withMasterKeyChangeReadLock(()
-> grpKeys.getAll(grpId));
Review comment:
Different locks order, deadlock is possible. This method is invoked
under metaStorageMux and then master key change log acquired, but in
doChangeMasterKey() master key change lock acquired first and then
metaStorageMux.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
##########
@@ -233,7 +233,25 @@
TRACKING_PAGE_REPAIR_DELTA(61, PHYSICAL),
/** Atomic out-of-order update. */
- OUT_OF_ORDER_UPDATE(62, LOGICAL);
+ OUT_OF_ORDER_UPDATE(62, LOGICAL),
+
+ /** Encrypted WAL-record. */
+ ENCRYPTED_RECORD_V2(63, PHYSICAL),
+
+ /** Ecnrypted data record. */
+ ENCRYPTED_DATA_RECORD_V2(64, LOGICAL),
+
+ /** Master key change record containing multiple keys for single cache
group. */
+ MASTER_KEY_CHANGE_RECORD_V2(65, LOGICAL),
+
+ /** Logical record to restart reencryption with the latest encryption
key. */
+ REENCRYPTION_START_RECORD(66, LOGICAL),
+
+ /** Partition meta page delta record includes encryption status data.
*/
+ PARTITION_META_PAGE_UPDATE_COUNTERS_V3(67, PHYSICAL),
Review comment:
Maybe `PARTITION_META_PAGE_DELTA_RECORD_V3`? WDYT?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
##########
@@ -483,16 +534,18 @@ public void onLocalJoin() {
if (dataBag.isJoiningNodeClient() ||
dataBag.commonDataCollectedFor(ENCRYPTION_MGR.ordinal()))
return;
- HashMap<Integer, byte[]> knownEncKeys = knownEncryptionKeys();
+ HashMap<Integer, GroupKeyEncrypted> knownEncKeys = grpKeys.getAll();
Review comment:
As far as I understand master key can be changed concurrently with this
method and getAll() encrypt keys with master key, so we should use
grpKeys.getAll() under master key change read lock (please pay attention to
locks order to avoid deadlocks). At least in this method, perhaps in other
methods too (please review also usages of getAll(grpId) and setGroupKeys
methods, perhaps these methods should be invoked under lock too)
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
##########
@@ -2863,11 +2869,16 @@ private RestoreLogicalState applyLogicalUpdates(
break;
- case MASTER_KEY_CHANGE_RECORD:
+ case MASTER_KEY_CHANGE_RECORD_V2:
Review comment:
Let's also keep `case MASTER_KEY_CHANGE_RECORD:`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
##########
@@ -811,6 +1124,53 @@ private void
sendGenerateEncryptionKeyRequest(GenerateEncryptionKeyFuture fut) t
ctx.io().sendToGridTopic(rndNode.id(), TOPIC_GEN_ENC_KEY, req,
SYSTEM_POOL);
}
+ /**
+ * @param grpIds Cache group IDs.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startReencryption(Collection<Integer> grpIds) throws
IgniteCheckedException {
+ if (pageScanner.disabled())
+ return;
+
+ for (int grpId : grpIds) {
+ IgniteInternalFuture<?> fut = pageScanner.schedule(grpId);
+
+ fut.listen(f -> {
+ try {
+ f.get();
Review comment:
Because exception driven flow control is an anti-pattern
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateIndexDataRecord.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIndexMetaIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Meta page delta record, includes encryption status data.
+ */
+public class MetaPageUpdateIndexDataRecord extends PageDeltaRecord {
+ /** Index of the last reencrypted page. */
+ private int encryptPageIdx;
+
+ /** Total pages to be reencrypted. */
+ private int encryptPageCnt;
+
+ /**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param encryptPageIdx Index of the last reencrypted page.
+ * @param encryptPageCnt Total pages to be reencrypted.
+ */
+ public MetaPageUpdateIndexDataRecord(int grpId, long pageId, int
encryptPageIdx, int encryptPageCnt) {
+ super(grpId, pageId);
+
+ this.encryptPageIdx = encryptPageIdx;
+ this.encryptPageCnt = encryptPageCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws
IgniteCheckedException {
+ PageIndexMetaIO io = PageIndexMetaIO.VERSIONS.forPage(pageAddr);
Review comment:
Perhaps here we can face with a not upgraded page (in case node failed
after page upgrade and before first checkpoint)
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/util/BasicRateLimiterTest.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Rate limiter tests.
+ */
+public class BasicRateLimiterTest {
Review comment:
Not included into any test suite
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
##########
@@ -627,22 +720,104 @@ public void groupKey(int grpId, byte[] encGrpKey) {
return withMasterKeyChangeReadLock(() -> getSpi().getMasterKeyName());
}
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> changeCacheGroupKey(Collection<String>
cacheOrGrpNames) {
+ A.notEmpty(cacheOrGrpNames, "cacheOrGrpNames");
+
+ synchronized (opsMux) {
+ if (stopped) {
+ return new IgniteFinishedFutureImpl<>(new
IgniteException("Cache group key change was rejected. " +
+ "Node is stopping."));
+ }
+
+ return grpKeyChangeProc.start(cacheOrGrpNames);
+ }
+ }
+
+ /**
+ * @param grpIds Cache group IDs.
+ * @param keyIds Encryption key IDs.
+ * @param keys Encryption keys.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected void changeCacheGroupKeyLocal(int[] grpIds, byte[] keyIds,
byte[][] keys) throws IgniteCheckedException {
+ Map<Integer, Byte> encryptionStatus = U.newHashMap(grpIds.length);
+
+ for (int i = 0; i < grpIds.length; i++)
+ encryptionStatus.put(grpIds[i], keyIds[i]);
+
+ WALPointer ptr = ctx.cache().context().wal().log(new
ReencryptionStartRecord(encryptionStatus));
+
+ if (ptr != null)
+ ctx.cache().context().wal().flush(ptr, false);
+
+ for (int i = 0; i < grpIds.length; i++) {
+ int grpId = grpIds[i];
+
+ CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
+
+ if (grp == null)
+ continue;
+
+ int newKeyId = keyIds[i] & 0xff;
+
+ synchronized (metaStorageMux) {
+ // Set new key as key for writing.
+ GroupKey prevGrpKey = grpKeys.changeActiveKey(grpId, newKeyId);
+
+ writeGroupKeysToMetaStore(grpId);
+
+ if (ptr != null) {
+ grpKeys.reserveWalKey(grpId, prevGrpKey.unsignedId(),
ctx.cache().context().wal().currentSegment());
+
+ writeTrackedWalIdxsToMetaStore();
+ }
+ }
+
+ reencryptGroups.put(grpId, pageScanner.pagesCount(grp));
+
+ if (log.isInfoEnabled())
+ log.info("New encryption key for group was added [grpId=" +
grpId + ", keyId=" + newKeyId + "]");
+ }
+
+ startReencryption(encryptionStatus.keySet());
+ }
+
+ /**
+ * @param grpId Cache group ID.
+ * @return Future that will be completed when reencryption of the
specified group is finished.
+ */
+ public IgniteInternalFuture<Void> reencryptionFuture(int grpId) {
+ return pageScanner.statusFuture(grpId);
+ }
+
+ /**
+ * @param grpId Cache group ID.
+ * @return {@code True} If the specified cache group should be
re-encrypted.
+ */
+ public boolean reencryptionRequired(int grpId) {
Review comment:
All usages of this method want to check that reencryption in progress.
Let's rename it to something like `reencryptionInProgress`
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/util/BasicRateLimiterTest.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Rate limiter tests.
+ */
+public class BasicRateLimiterTest {
+ /**
+ * Check rate limit with multiple threads.
+ */
+ @Test
+ public void checkLimitMultithreaded() throws Exception {
+ int opsPerSec = 1_000;
+ int totalOps = 10_000;
+
+ BasicRateLimiter limiter = new BasicRateLimiter(opsPerSec);
+
+ int threads = Runtime.getRuntime().availableProcessors();
+
+ CyclicBarrier ready = new CyclicBarrier(threads + 1);
+
+ AtomicInteger cntr = new AtomicInteger();
+
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ ready.await();
+
+ do {
+ limiter.acquire(1);
+ }
+ while (!Thread.currentThread().isInterrupted() &&
cntr.incrementAndGet() < totalOps);
+
+ return null;
+ }, threads, "worker");
+
+ ready.await();
+
+ long startTime = System.currentTimeMillis();
+
+ fut.get();
+
+ long timeSpent = System.currentTimeMillis() - startTime;
+
+ assertEquals(totalOps / opsPerSec, SECONDS.convert(timeSpent,
MILLISECONDS));
+ }
+
+ /**
+ * Check that the average speed is limited correctly even if we are
acquiring more permits than allowed per second.
+ */
+ @Test
+ public void checkAcquireWithOverflow() throws
IgniteInterruptedCheckedException {
+ double permitsPerSec = 0.5;
+ int permitsPerOp = 1;
+ int totalOps = 5;
+
+ BasicRateLimiter limiter = new BasicRateLimiter(permitsPerSec);
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i <= totalOps; i++)
Review comment:
Actually, you check `totalOps + 1` operations here
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/util/BasicRateLimiterTest.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Rate limiter tests.
+ */
+public class BasicRateLimiterTest {
+ /**
+ * Check rate limit with multiple threads.
+ */
+ @Test
+ public void checkLimitMultithreaded() throws Exception {
+ int opsPerSec = 1_000;
+ int totalOps = 10_000;
+
+ BasicRateLimiter limiter = new BasicRateLimiter(opsPerSec);
+
+ int threads = Runtime.getRuntime().availableProcessors();
+
+ CyclicBarrier ready = new CyclicBarrier(threads + 1);
+
+ AtomicInteger cntr = new AtomicInteger();
+
+ IgniteInternalFuture<Long> fut =
GridTestUtils.runMultiThreadedAsync(() -> {
+ ready.await();
+
+ do {
+ limiter.acquire(1);
+ }
+ while (!Thread.currentThread().isInterrupted() &&
cntr.incrementAndGet() < totalOps);
+
+ return null;
+ }, threads, "worker");
+
+ ready.await();
+
+ long startTime = System.currentTimeMillis();
+
+ fut.get();
+
+ long timeSpent = System.currentTimeMillis() - startTime;
+
+ assertEquals(totalOps / opsPerSec, SECONDS.convert(timeSpent,
MILLISECONDS));
Review comment:
In some circumstances, I think `timeSpent` can be a little bit less than
10 seconds
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/encryption/AbstractEncryptionTest.java
##########
@@ -271,4 +313,185 @@ protected boolean checkMasterKeyName(String name) {
return true;
}
+
+ /**
+ * Load data into cache "{@link #cacheName()}" using node "{@link
#GRID_0}".
+ *
+ * @param cnt Count of entries.
+ */
+ protected void loadData(int cnt) {
+ loadData(cacheName(), cnt);
+ }
+
+ /**
+ * Load data into cache using node "{@link #GRID_0}".
+ *
+ * @param cnt Count of entries.
+ * @param cacheName Cache name.
+ */
+ protected void loadData(String cacheName, int cnt) {
+ info("Loading " + cnt + " entries into " + cacheName);
+
+ int start = grid(GRID_0).cache(cacheName).size();
+
+ try (IgniteDataStreamer<Long, Object> streamer =
grid(GRID_0).dataStreamer(cacheName)) {
+ for (long i = start; i < (cnt + start); i++)
+ streamer.addData(i, generateValue(i));
+ }
+
+ info("Load data finished");
+ }
+
+ /**
+ * Ensures that all pages of page store have expected encryption key
identifier.
+ *
+ * @param grpId Cache group ID.
+ * @param keyId Encryption key ID.
+ * @param timeout Timeout to wait for encryption to complete.
+ * @throws Exception If failed.
+ */
+ protected void checkGroupKey(int grpId, int keyId, long timeout) throws
Exception {
+ awaitEncryption(G.allGrids(), grpId, timeout);
+
+ for (Ignite g : G.allGrids()) {
+ info("Validating encryption key [node=" +
g.cluster().localNode().id() + ", grp=" + grpId + "]");
+
+ IgniteEx grid = (IgniteEx)g;
+
+ if (grid.context().clientNode())
+ continue;
+
+ GridEncryptionManager encryption = grid.context().encryption();
+
+ assertEquals(grid.localNode().id().toString(), (byte)keyId,
encryption.groupKey(grpId).id());
+
+ IgniteInternalFuture<Void> fut =
encryption.reencryptionFuture(grpId);
+
+ // The future will be completed after the checkpoint,
forcecheckpoint does nothing
+ // if the checkpoint has already been scheduled.
+ GridTestUtils.waitForCondition(() -> {
+ if (fut.isDone())
+ return true;
+
+ try {
+ forceCheckpoint(g);
+ }
+ catch (IgniteCheckedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return fut.isDone();
+ }, timeout);
+
+ assertTrue(fut.isDone());
+
+ CacheGroupContext grp = grid.context().cache().cacheGroup(grpId);
+
+ List<Integer> parts = IntStream.range(0,
grp.shared().affinity().affinity(grpId).partitions())
+ .boxed().collect(Collectors.toList());
+
+ parts.add(INDEX_PARTITION);
+
+ int realPageSize =
grp.dataRegion().pageMemory().realPageSize(grpId);
+ int encryptionBlockSize =
grp.shared().kernalContext().config().getEncryptionSpi().blockSize();
+
+ for (int p : parts) {
+ FilePageStore pageStore =
+
(FilePageStore)((FilePageStoreManager)grp.shared().pageStore()).getStore(grpId,
p);
+
+ if (!pageStore.exists())
+ continue;
+
+ long state =
grid.context().encryption().getEncryptionState(grpId, p);
+
+ String msg = String.format("p=%d, off=%d, total=%d",
+ p, ReencryptStateUtils.pageIndex(state),
ReencryptStateUtils.pageCount(state));
+
+ assertEquals(msg, 0, ReencryptStateUtils.pageCount(state));
+ assertEquals(msg, 0, ReencryptStateUtils.pageIndex(state));
+
+ long startPageId = PageIdUtils.pageId(p,
PageIdAllocator.FLAG_DATA, 0);
+
+ int pagesCnt = pageStore.pages();
+ int pageSize = pageStore.getPageSize();
+
+ ByteBuffer pageBuf = ByteBuffer.allocate(pageSize);
+
+ Path path = new File(pageStore.getFileAbsolutePath()).toPath();
+
+ try (FileChannel ch = FileChannel.open(path,
StandardOpenOption.READ)) {
+ for (int n = 0; n < pagesCnt; n++) {
+ long pageId = startPageId + n;
+ long pageOff = pageStore.pageOffset(pageId);
+
+ pageBuf.position(0);
+
+ ch.position(pageOff);
+ ch.read(pageBuf);
+
+ pageBuf.position(realPageSize + encryptionBlockSize);
+
+ // If crc present
Review comment:
Point at the end.
Why there can be pages without CRC?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]