alex-plekhanov commented on a change in pull request #7941: URL: https://github.com/apache/ignite/pull/7941#discussion_r475452890
########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.EncryptionConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.util.BasicRateLimiter; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteInClosureX; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; + +import static org.apache.ignite.internal.util.IgniteUtils.MB; + +/** + * Cache group page stores scanner. + * Scans a range of pages and marks them as dirty to re-encrypt them with the last encryption key on disk. + */ +public class CacheGroupPageScanner implements DbCheckpointListener { + /** Encryption configuration. */ + private final EncryptionConfiguration encrCfg; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** Mapping of cache group ID to group scanning task. */ + private final Map<Integer, GroupScanTask> grps = new ConcurrentHashMap<>(); + + /** Collection of groups waiting for a checkpoint. */ + private final Collection<GroupScanTask> cpWaitGrps = new ConcurrentLinkedQueue<>(); + + /** Page scanning speed limiter. */ + private final BasicRateLimiter limiter; + + /** Stop flag. */ + private boolean stopped; + + /** + * @param ctx Grid kernal context. + */ + public CacheGroupPageScanner(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + encrCfg = ctx.config().getDataStorageConfiguration().getEncryptionConfiguration(); + + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + + limiter = CU.isPersistenceEnabled(dsCfg) && encrCfg.getReencryptionRateLimit() > 0 ? + new BasicRateLimiter(encrCfg.getReencryptionRateLimit() * MB / dsCfg.getPageSize()) : null; + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context cpCtx) { + Set<GroupScanTask> completeCandidates = new HashSet<>(); + + cpWaitGrps.removeIf(completeCandidates::add); + + cpCtx.finishedStateFut().listen( + f -> { + // Retry if error occurs. + if (f.error() != null || f.isCancelled()) { + cpWaitGrps.addAll(completeCandidates); + + return; + } + + lock.lock(); + + try { + for (GroupScanTask grpScanTask : completeCandidates) { + grps.remove(grpScanTask.groupId()); + + grpScanTask.onDone(); + + if (log.isInfoEnabled()) + log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]"); + } + + if (!grps.isEmpty()) + return; + + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()). + removeCheckpointListener(this); + } + finally { + lock.unlock(); + } + } + ); + } + + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context cpCtx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + /** + * @return {@code True} If reencryption is disabled. + */ + public boolean disabled() { + return encrCfg.isReencryptionDisabled(); + } + + /** + * Schedule scanning partitions. + * + * @param grpId Cache group ID. + */ + public IgniteInternalFuture<Void> schedule(int grpId) throws IgniteCheckedException { + if (disabled()) + throw new IgniteCheckedException("Reencryption is disabled."); + + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + + if (grp == null) { + if (log.isDebugEnabled()) + log.debug("Skip reencryption, cache group was destroyed [grp=" + grpId + "]"); + + return new GridFinishedFuture<>(); + } + + lock.lock(); + + try { + if (stopped) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + if (grps.isEmpty()) + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this); + + GroupScanTask prevState = grps.get(grpId); + + if (prevState != null) { + if (log.isDebugEnabled()) + log.debug("Reencryption already scheduled [grpId=" + grpId + "]"); + + return prevState; + } + + Set<Integer> parts = new HashSet<>(); + + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) { + if (ctx.encryption().getEncryptionState(grpId, partId) == 0) { + if (log.isDebugEnabled()) + log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]"); + + return; + } + + parts.add(partId); + } + }); + + GroupScanTask grpScan = new GroupScanTask(grp, parts); + + ctx.getSystemExecutorService().submit(grpScan); + + if (log.isInfoEnabled()) + log.info("Scheduled reencryption [grpId=" + grpId + "]"); + + grps.put(grpId, grpScan); + + return grpScan; + } + finally { + lock.unlock(); + } + } + + /** + * @param grpId Cache group ID. + * @return Future that will be completed when all partitions have been scanned and pages have been written to disk. + */ + public IgniteInternalFuture<Void> statusFuture(int grpId) { + GroupScanTask ctx0 = grps.get(grpId); + + return ctx0 == null ? new GridFinishedFuture<>() : ctx0; + } + + /** + * Shutdown scanning and disable new tasks scheduling. + */ + public void stop() throws IgniteCheckedException { + lock.lock(); + + try { + stopped = true; + + for (GroupScanTask grpScanTask : grps.values()) + grpScanTask.cancel(); + } finally { + lock.unlock(); + } + } + + /** + * Stop scannig the specified partition. + * + * @param grpId Cache group ID. + * @param partId Partition ID. + * @return {@code True} if reencryption was cancelled. + */ + public boolean cancel(int grpId, int partId) { + GroupScanTask grpScanTask = grps.get(grpId); + + if (grpScanTask == null) + return false; + + return grpScanTask.cancel(partId); + } + + /** + * Collect current number of pages in the specified cache group. + * + * @param grp Cache group. + * @return Partitions with current page count. + * @throws IgniteCheckedException If failed. + */ + public long[] pagesCount(CacheGroupContext grp) throws IgniteCheckedException { + long[] partStates = new long[grp.affinity().partitions() + 1]; + + ctx.cache().context().database().checkpointReadLock(); + + try { + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) throws IgniteCheckedException { + int pagesCnt = ctx.cache().context().pageStore().pages(grp.groupId(), partId); + + partStates[Math.min(partId, partStates.length - 1)] = pagesCnt; + } + }); + } finally { + ctx.cache().context().database().checkpointReadUnlock(); + } + + return partStates; + } + + /** + * @param grp Cache group. + * @param hnd Partition handler. + */ + private void forEachPageStore(CacheGroupContext grp, IgniteInClosureX<Integer> hnd) throws IgniteCheckedException { + int parts = grp.affinity().partitions(); + + IgnitePageStoreManager pageStoreMgr = ctx.cache().context().pageStore(); + + for (int p = 0; p < parts; p++) { + if (!pageStoreMgr.exists(grp.groupId(), p)) + continue; + + hnd.applyx(p); + } + + hnd.applyx(PageIdAllocator.INDEX_PARTITION); + } + + /** + * Cache group partition scanning task. + */ + private class GroupScanTask extends GridFutureAdapter<Void> implements Runnable { + /** Cache group ID. */ + private final CacheGroupContext grp; + + /** Partition IDs. */ + private final Set<Integer> parts; + + /** Page memory. */ + private final PageMemoryEx pageMem; + + /** + * @param grp Cache group. + */ + public GroupScanTask(CacheGroupContext grp, Set<Integer> parts) { + this.grp = grp; + this.parts = new GridConcurrentHashSet<>(parts); + + pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean cancel() throws IgniteCheckedException { + return onDone(null, null, true); + } + + /** + * Stop reencryption of the specified partition. + * + * @param partId Partition ID. + * @return {@code True} if reencryption was cancelled. + */ + public synchronized boolean cancel(int partId) { Review comment: Perhaps `cancel` is not a good method name for `page scanner`. Maybe something like `excludePartition`? Also, I think `synchronized` is redundant here ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.EncryptionConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.util.BasicRateLimiter; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteInClosureX; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; + +import static org.apache.ignite.internal.util.IgniteUtils.MB; + +/** + * Cache group page stores scanner. + * Scans a range of pages and marks them as dirty to re-encrypt them with the last encryption key on disk. + */ +public class CacheGroupPageScanner implements DbCheckpointListener { + /** Encryption configuration. */ + private final EncryptionConfiguration encrCfg; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** Mapping of cache group ID to group scanning task. */ + private final Map<Integer, GroupScanTask> grps = new ConcurrentHashMap<>(); + + /** Collection of groups waiting for a checkpoint. */ + private final Collection<GroupScanTask> cpWaitGrps = new ConcurrentLinkedQueue<>(); + + /** Page scanning speed limiter. */ + private final BasicRateLimiter limiter; + + /** Stop flag. */ + private boolean stopped; + + /** + * @param ctx Grid kernal context. + */ + public CacheGroupPageScanner(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + encrCfg = ctx.config().getDataStorageConfiguration().getEncryptionConfiguration(); + + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + + limiter = CU.isPersistenceEnabled(dsCfg) && encrCfg.getReencryptionRateLimit() > 0 ? + new BasicRateLimiter(encrCfg.getReencryptionRateLimit() * MB / dsCfg.getPageSize()) : null; + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context cpCtx) { + Set<GroupScanTask> completeCandidates = new HashSet<>(); + + cpWaitGrps.removeIf(completeCandidates::add); + + cpCtx.finishedStateFut().listen( + f -> { + // Retry if error occurs. + if (f.error() != null || f.isCancelled()) { + cpWaitGrps.addAll(completeCandidates); + + return; + } + + lock.lock(); + + try { + for (GroupScanTask grpScanTask : completeCandidates) { + grps.remove(grpScanTask.groupId()); + + grpScanTask.onDone(); + + if (log.isInfoEnabled()) + log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]"); + } + + if (!grps.isEmpty()) + return; + + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()). + removeCheckpointListener(this); + } + finally { + lock.unlock(); + } + } + ); + } + + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context cpCtx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + /** + * @return {@code True} If reencryption is disabled. + */ + public boolean disabled() { + return encrCfg.isReencryptionDisabled(); + } + + /** + * Schedule scanning partitions. + * + * @param grpId Cache group ID. + */ + public IgniteInternalFuture<Void> schedule(int grpId) throws IgniteCheckedException { + if (disabled()) + throw new IgniteCheckedException("Reencryption is disabled."); + + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + + if (grp == null) { + if (log.isDebugEnabled()) + log.debug("Skip reencryption, cache group was destroyed [grp=" + grpId + "]"); + + return new GridFinishedFuture<>(); + } + + lock.lock(); + + try { + if (stopped) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + if (grps.isEmpty()) + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this); + + GroupScanTask prevState = grps.get(grpId); + + if (prevState != null) { + if (log.isDebugEnabled()) + log.debug("Reencryption already scheduled [grpId=" + grpId + "]"); + + return prevState; + } + + Set<Integer> parts = new HashSet<>(); + + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) { + if (ctx.encryption().getEncryptionState(grpId, partId) == 0) { + if (log.isDebugEnabled()) + log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]"); + + return; + } + + parts.add(partId); + } + }); + + GroupScanTask grpScan = new GroupScanTask(grp, parts); + + ctx.getSystemExecutorService().submit(grpScan); + + if (log.isInfoEnabled()) + log.info("Scheduled reencryption [grpId=" + grpId + "]"); + + grps.put(grpId, grpScan); + + return grpScan; + } + finally { + lock.unlock(); + } + } + + /** + * @param grpId Cache group ID. + * @return Future that will be completed when all partitions have been scanned and pages have been written to disk. + */ + public IgniteInternalFuture<Void> statusFuture(int grpId) { + GroupScanTask ctx0 = grps.get(grpId); + + return ctx0 == null ? new GridFinishedFuture<>() : ctx0; + } + + /** + * Shutdown scanning and disable new tasks scheduling. + */ + public void stop() throws IgniteCheckedException { + lock.lock(); + + try { + stopped = true; + + for (GroupScanTask grpScanTask : grps.values()) + grpScanTask.cancel(); + } finally { + lock.unlock(); + } + } + + /** + * Stop scannig the specified partition. + * + * @param grpId Cache group ID. + * @param partId Partition ID. + * @return {@code True} if reencryption was cancelled. + */ + public boolean cancel(int grpId, int partId) { + GroupScanTask grpScanTask = grps.get(grpId); + + if (grpScanTask == null) + return false; + + return grpScanTask.cancel(partId); + } + + /** + * Collect current number of pages in the specified cache group. + * + * @param grp Cache group. + * @return Partitions with current page count. + * @throws IgniteCheckedException If failed. + */ + public long[] pagesCount(CacheGroupContext grp) throws IgniteCheckedException { + long[] partStates = new long[grp.affinity().partitions() + 1]; Review comment: Lets add a comment, that we need 1 extra item for index partition ########## File path: modules/core/src/main/java/org/apache/ignite/mxbean/EncryptionMXBean.java ########## @@ -43,4 +44,15 @@ public void changeMasterKey( @MXBeanParameter(name = "masterKeyName", description = "Master key name.") String masterKeyName ); + + /** + * Starts cache group encryption key change process. + * + * @param cacheOrGrpName Cache or group name. + * @see IgniteEncryption#changeCacheGroupKey(Collection) + */ + @MXBeanDescription("Change cache group key.") + public void changeCacheGroupKey( + @MXBeanParameter(name = "cacheOrGroupName", description = "Cache or group name.") String cacheOrGrpName Review comment: Let's rename parameter `cacheOrGroupName` to just `cacheGroupName`, otherwise it's confusing and someone can decide that it's possible to change keys for single cache within cache group. (I know that method CacheGroupContext.cacheOrGroupName exists, but it's internal API) ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.EncryptionConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.util.BasicRateLimiter; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteInClosureX; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; + +import static org.apache.ignite.internal.util.IgniteUtils.MB; + +/** + * Cache group page stores scanner. + * Scans a range of pages and marks them as dirty to re-encrypt them with the last encryption key on disk. + */ +public class CacheGroupPageScanner implements DbCheckpointListener { + /** Encryption configuration. */ + private final EncryptionConfiguration encrCfg; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** Mapping of cache group ID to group scanning task. */ + private final Map<Integer, GroupScanTask> grps = new ConcurrentHashMap<>(); + + /** Collection of groups waiting for a checkpoint. */ + private final Collection<GroupScanTask> cpWaitGrps = new ConcurrentLinkedQueue<>(); + + /** Page scanning speed limiter. */ + private final BasicRateLimiter limiter; + + /** Stop flag. */ + private boolean stopped; + + /** + * @param ctx Grid kernal context. + */ + public CacheGroupPageScanner(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + encrCfg = ctx.config().getDataStorageConfiguration().getEncryptionConfiguration(); + + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + + limiter = CU.isPersistenceEnabled(dsCfg) && encrCfg.getReencryptionRateLimit() > 0 ? + new BasicRateLimiter(encrCfg.getReencryptionRateLimit() * MB / dsCfg.getPageSize()) : null; + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context cpCtx) { + Set<GroupScanTask> completeCandidates = new HashSet<>(); + + cpWaitGrps.removeIf(completeCandidates::add); + + cpCtx.finishedStateFut().listen( + f -> { + // Retry if error occurs. + if (f.error() != null || f.isCancelled()) { + cpWaitGrps.addAll(completeCandidates); + + return; + } + + lock.lock(); + + try { + for (GroupScanTask grpScanTask : completeCandidates) { + grps.remove(grpScanTask.groupId()); + + grpScanTask.onDone(); + + if (log.isInfoEnabled()) + log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]"); + } + + if (!grps.isEmpty()) + return; + + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()). + removeCheckpointListener(this); + } + finally { + lock.unlock(); + } + } + ); + } + + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context cpCtx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + /** + * @return {@code True} If reencryption is disabled. + */ + public boolean disabled() { + return encrCfg.isReencryptionDisabled(); + } + + /** + * Schedule scanning partitions. + * + * @param grpId Cache group ID. + */ + public IgniteInternalFuture<Void> schedule(int grpId) throws IgniteCheckedException { + if (disabled()) + throw new IgniteCheckedException("Reencryption is disabled."); + + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + + if (grp == null) { + if (log.isDebugEnabled()) + log.debug("Skip reencryption, cache group was destroyed [grp=" + grpId + "]"); + + return new GridFinishedFuture<>(); + } + + lock.lock(); + + try { + if (stopped) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + if (grps.isEmpty()) + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this); + + GroupScanTask prevState = grps.get(grpId); + + if (prevState != null) { + if (log.isDebugEnabled()) + log.debug("Reencryption already scheduled [grpId=" + grpId + "]"); + + return prevState; + } + + Set<Integer> parts = new HashSet<>(); + + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) { + if (ctx.encryption().getEncryptionState(grpId, partId) == 0) { + if (log.isDebugEnabled()) + log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]"); + + return; + } + + parts.add(partId); + } + }); + + GroupScanTask grpScan = new GroupScanTask(grp, parts); + + ctx.getSystemExecutorService().submit(grpScan); + + if (log.isInfoEnabled()) + log.info("Scheduled reencryption [grpId=" + grpId + "]"); + + grps.put(grpId, grpScan); + + return grpScan; + } + finally { + lock.unlock(); + } + } + + /** + * @param grpId Cache group ID. + * @return Future that will be completed when all partitions have been scanned and pages have been written to disk. + */ + public IgniteInternalFuture<Void> statusFuture(int grpId) { + GroupScanTask ctx0 = grps.get(grpId); + + return ctx0 == null ? new GridFinishedFuture<>() : ctx0; + } + + /** + * Shutdown scanning and disable new tasks scheduling. + */ + public void stop() throws IgniteCheckedException { + lock.lock(); + + try { + stopped = true; + + for (GroupScanTask grpScanTask : grps.values()) + grpScanTask.cancel(); + } finally { + lock.unlock(); + } + } + + /** + * Stop scannig the specified partition. + * + * @param grpId Cache group ID. + * @param partId Partition ID. + * @return {@code True} if reencryption was cancelled. + */ + public boolean cancel(int grpId, int partId) { + GroupScanTask grpScanTask = grps.get(grpId); + + if (grpScanTask == null) + return false; + + return grpScanTask.cancel(partId); + } + + /** + * Collect current number of pages in the specified cache group. + * + * @param grp Cache group. + * @return Partitions with current page count. + * @throws IgniteCheckedException If failed. + */ + public long[] pagesCount(CacheGroupContext grp) throws IgniteCheckedException { + long[] partStates = new long[grp.affinity().partitions() + 1]; + + ctx.cache().context().database().checkpointReadLock(); + + try { + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) throws IgniteCheckedException { + int pagesCnt = ctx.cache().context().pageStore().pages(grp.groupId(), partId); + + partStates[Math.min(partId, partStates.length - 1)] = pagesCnt; + } + }); + } finally { + ctx.cache().context().database().checkpointReadUnlock(); + } + + return partStates; + } + + /** + * @param grp Cache group. + * @param hnd Partition handler. + */ + private void forEachPageStore(CacheGroupContext grp, IgniteInClosureX<Integer> hnd) throws IgniteCheckedException { + int parts = grp.affinity().partitions(); + + IgnitePageStoreManager pageStoreMgr = ctx.cache().context().pageStore(); + + for (int p = 0; p < parts; p++) { + if (!pageStoreMgr.exists(grp.groupId(), p)) + continue; + + hnd.applyx(p); + } + + hnd.applyx(PageIdAllocator.INDEX_PARTITION); + } + + /** + * Cache group partition scanning task. + */ + private class GroupScanTask extends GridFutureAdapter<Void> implements Runnable { + /** Cache group ID. */ + private final CacheGroupContext grp; + + /** Partition IDs. */ + private final Set<Integer> parts; + + /** Page memory. */ + private final PageMemoryEx pageMem; + + /** + * @param grp Cache group. + */ + public GroupScanTask(CacheGroupContext grp, Set<Integer> parts) { + this.grp = grp; + this.parts = new GridConcurrentHashSet<>(parts); + + pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean cancel() throws IgniteCheckedException { Review comment: synchronized is redundant here. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -611,7 +695,12 @@ public void groupKey(int grpId, byte[] encGrpKey) { "The previous change was not completed.")); } - masterKeyChangeFut = new MasterKeyChangeFuture(request.requestId()); + if (!grpKeyChangeProc.finished()) { Review comment: `finished()` can be `false` only for node where key change process was initiated. Why we don't restrict master key change on other nodes? Do we need `finished` method at all? (Perhaps can be replaced with `started()`) ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java ########## @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.EncryptionConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.util.BasicRateLimiter; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteInClosureX; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; + +import static org.apache.ignite.internal.util.IgniteUtils.MB; + +/** + * Cache group page stores scanner. + * Scans a range of pages and marks them as dirty to re-encrypt them with the last encryption key on disk. + */ +public class CacheGroupPageScanner implements DbCheckpointListener { + /** Encryption configuration. */ + private final EncryptionConfiguration encrCfg; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** Mapping of cache group ID to group scanning task. */ + private final Map<Integer, GroupScanTask> grps = new ConcurrentHashMap<>(); + + /** Collection of groups waiting for a checkpoint. */ + private final Collection<GroupScanTask> cpWaitGrps = new ConcurrentLinkedQueue<>(); + + /** Page scanning speed limiter. */ + private final BasicRateLimiter limiter; + + /** Stop flag. */ + private boolean stopped; + + /** + * @param ctx Grid kernal context. + */ + public CacheGroupPageScanner(GridKernalContext ctx) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + encrCfg = ctx.config().getDataStorageConfiguration().getEncryptionConfiguration(); + + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + + limiter = CU.isPersistenceEnabled(dsCfg) && encrCfg.getReencryptionRateLimit() > 0 ? + new BasicRateLimiter(encrCfg.getReencryptionRateLimit() * MB / dsCfg.getPageSize()) : null; + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context cpCtx) { + Set<GroupScanTask> completeCandidates = new HashSet<>(); + + cpWaitGrps.removeIf(completeCandidates::add); + + cpCtx.finishedStateFut().listen( + f -> { + // Retry if error occurs. + if (f.error() != null || f.isCancelled()) { + cpWaitGrps.addAll(completeCandidates); + + return; + } + + lock.lock(); + + try { + for (GroupScanTask grpScanTask : completeCandidates) { + grps.remove(grpScanTask.groupId()); + + grpScanTask.onDone(); + + if (log.isInfoEnabled()) + log.info("Cache group reencryption is finished [grpId=" + grpScanTask.groupId() + "]"); + } + + if (!grps.isEmpty()) + return; + + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()). + removeCheckpointListener(this); + } + finally { + lock.unlock(); + } + } + ); + } + + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context cpCtx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + // No-op. + } + + /** + * @return {@code True} If reencryption is disabled. + */ + public boolean disabled() { + return encrCfg.isReencryptionDisabled(); + } + + /** + * Schedule scanning partitions. + * + * @param grpId Cache group ID. + */ + public IgniteInternalFuture<Void> schedule(int grpId) throws IgniteCheckedException { + if (disabled()) + throw new IgniteCheckedException("Reencryption is disabled."); + + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + + if (grp == null) { + if (log.isDebugEnabled()) + log.debug("Skip reencryption, cache group was destroyed [grp=" + grpId + "]"); + + return new GridFinishedFuture<>(); + } + + lock.lock(); + + try { + if (stopped) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + if (grps.isEmpty()) + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this); + + GroupScanTask prevState = grps.get(grpId); + + if (prevState != null) { + if (log.isDebugEnabled()) + log.debug("Reencryption already scheduled [grpId=" + grpId + "]"); + + return prevState; + } + + Set<Integer> parts = new HashSet<>(); + + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) { + if (ctx.encryption().getEncryptionState(grpId, partId) == 0) { + if (log.isDebugEnabled()) + log.debug("Skipping partition reencryption [grp=" + grpId + ", p=" + partId + "]"); + + return; + } + + parts.add(partId); + } + }); + + GroupScanTask grpScan = new GroupScanTask(grp, parts); + + ctx.getSystemExecutorService().submit(grpScan); + + if (log.isInfoEnabled()) + log.info("Scheduled reencryption [grpId=" + grpId + "]"); + + grps.put(grpId, grpScan); + + return grpScan; + } + finally { + lock.unlock(); + } + } + + /** + * @param grpId Cache group ID. + * @return Future that will be completed when all partitions have been scanned and pages have been written to disk. + */ + public IgniteInternalFuture<Void> statusFuture(int grpId) { + GroupScanTask ctx0 = grps.get(grpId); + + return ctx0 == null ? new GridFinishedFuture<>() : ctx0; + } + + /** + * Shutdown scanning and disable new tasks scheduling. + */ + public void stop() throws IgniteCheckedException { + lock.lock(); + + try { + stopped = true; + + for (GroupScanTask grpScanTask : grps.values()) + grpScanTask.cancel(); + } finally { + lock.unlock(); + } + } + + /** + * Stop scannig the specified partition. + * + * @param grpId Cache group ID. + * @param partId Partition ID. + * @return {@code True} if reencryption was cancelled. + */ + public boolean cancel(int grpId, int partId) { + GroupScanTask grpScanTask = grps.get(grpId); + + if (grpScanTask == null) + return false; + + return grpScanTask.cancel(partId); + } + + /** + * Collect current number of pages in the specified cache group. + * + * @param grp Cache group. + * @return Partitions with current page count. + * @throws IgniteCheckedException If failed. + */ + public long[] pagesCount(CacheGroupContext grp) throws IgniteCheckedException { + long[] partStates = new long[grp.affinity().partitions() + 1]; + + ctx.cache().context().database().checkpointReadLock(); + + try { + forEachPageStore(grp, new IgniteInClosureX<Integer>() { + @Override public void applyx(Integer partId) throws IgniteCheckedException { + int pagesCnt = ctx.cache().context().pageStore().pages(grp.groupId(), partId); + + partStates[Math.min(partId, partStates.length - 1)] = pagesCnt; + } + }); + } finally { + ctx.cache().context().database().checkpointReadUnlock(); + } + + return partStates; + } + + /** + * @param grp Cache group. + * @param hnd Partition handler. + */ + private void forEachPageStore(CacheGroupContext grp, IgniteInClosureX<Integer> hnd) throws IgniteCheckedException { + int parts = grp.affinity().partitions(); + + IgnitePageStoreManager pageStoreMgr = ctx.cache().context().pageStore(); + + for (int p = 0; p < parts; p++) { + if (!pageStoreMgr.exists(grp.groupId(), p)) + continue; + + hnd.applyx(p); + } + + hnd.applyx(PageIdAllocator.INDEX_PARTITION); + } + + /** + * Cache group partition scanning task. + */ + private class GroupScanTask extends GridFutureAdapter<Void> implements Runnable { + /** Cache group ID. */ + private final CacheGroupContext grp; + + /** Partition IDs. */ + private final Set<Integer> parts; + + /** Page memory. */ + private final PageMemoryEx pageMem; + + /** + * @param grp Cache group. + */ + public GroupScanTask(CacheGroupContext grp, Set<Integer> parts) { + this.grp = grp; + this.parts = new GridConcurrentHashSet<>(parts); + + pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean cancel() throws IgniteCheckedException { + return onDone(null, null, true); Review comment: `return onCancelled()`? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -503,58 +549,96 @@ else if (newKeys != null) { /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { + assert !writeToMetaStoreEnabled; + if (ctx.clientNode()) return; - Map<Integer, byte[]> encKeysFromCluster = (Map<Integer, byte[]>)data.commonData(); + Map<Integer, Object> encKeysFromCluster = (Map<Integer, Object>)data.commonData(); if (F.isEmpty(encKeysFromCluster)) return; - for (Map.Entry<Integer, byte[]> entry : encKeysFromCluster.entrySet()) { - if (groupKey(entry.getKey()) == null) { - U.quietAndInfo(log, "Store group key received from coordinator [grp=" + entry.getKey() + "]"); + for (Map.Entry<Integer, Object> entry : encKeysFromCluster.entrySet()) { + int grpId = entry.getKey(); - groupKey(entry.getKey(), entry.getValue()); - } - else { + GroupKeyEncrypted rmtKey; + + if (entry.getValue() instanceof GroupKeyEncrypted) + rmtKey = (GroupKeyEncrypted)entry.getValue(); + else + rmtKey = new GroupKeyEncrypted(INITIAL_KEY_ID, (byte[])entry.getValue()); + + GroupKey locGrpKey = groupKey(grpId); + + if (locGrpKey != null && locGrpKey.unsignedId() == rmtKey.id()) { U.quietAndInfo(log, "Skip group key received from coordinator. Already exists. [grp=" + - entry.getKey() + "]"); + grpId + ", keyId=" + rmtKey.id() + "]"); + + continue; } + + U.quietAndInfo(log, "Store group key received from coordinator [grp=" + grpId + + ", keyId=" + rmtKey.id() + "]"); + + //changeActiveKey Review comment: ? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyChangeProcess.java ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFeatures; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager.EmptyResult; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager.KeyChangeFuture; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteFutureCancelledException; + +import static org.apache.ignite.internal.IgniteFeatures.CACHE_GROUP_KEY_CHANGE; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_FINISH; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_PREPARE; + +/** + * Two phase distributed process, that performs cache group encryption key rotation. + */ +class GroupKeyChangeProcess { + /** Grid kernal context. */ + private final GridKernalContext ctx; + + /** Cache group encyption key change prepare phase. */ + private final DistributedProcess<ChangeCacheEncryptionRequest, EmptyResult> prepareGKChangeProc; + + /** Cache group encyption key change perform phase. */ + private final DistributedProcess<ChangeCacheEncryptionRequest, EmptyResult> performGKChangeProc; + + /** Group encryption keys. */ + private final CacheGroupEncryptionKeys keys; + + /** Cache group key change future. */ + private volatile GroupKeyChangeFuture fut; + + /** Cache group key change request. */ + private volatile ChangeCacheEncryptionRequest req; + + /** + * @param ctx Grid kernal context. + */ + GroupKeyChangeProcess(GridKernalContext ctx, CacheGroupEncryptionKeys keys) { + this.ctx = ctx; + this.keys = keys; + + prepareGKChangeProc = + new DistributedProcess<>(ctx, CACHE_GROUP_KEY_CHANGE_PREPARE, this::prepare, this::finishPrepare); + performGKChangeProc = + new DistributedProcess<>(ctx, CACHE_GROUP_KEY_CHANGE_FINISH, this::perform, this::finishPerform); + } + + /** + * @return {@code True} if operation is still in progress. + */ + public boolean started() { + return req != null; + } + + /** + * @return {@code True} if operation is not finished. + */ + public boolean finished() { + IgniteInternalFuture<Void> fut0 = fut; + + return fut0 == null || fut0.isDone(); + } + + /** + * @param msg Error message. + */ + public void cancel(String msg) { + GridFutureAdapter<Void> keyChangeFut = fut; + + if (keyChangeFut != null && !keyChangeFut.isDone()) + keyChangeFut.onDone(new IgniteFutureCancelledException(msg)); + } + + /** + * Starts cache group encryption key change process. + * + * @param cacheOrGrpNames Cache or group names. + */ + public IgniteFuture<Void> start(Collection<String> cacheOrGrpNames) { + if (ctx.clientNode()) + throw new UnsupportedOperationException("Client and daemon nodes can not perform this operation."); + + if (!IgniteFeatures.allNodesSupports(ctx.grid().cluster().nodes(), CACHE_GROUP_KEY_CHANGE)) + throw new IllegalStateException("Not all nodes in the cluster support this operation."); + + if (!ctx.state().clusterState().state().active()) + throw new IgniteException("Operation was rejected. The cluster is inactive."); + + if (!finished()) { + return new IgniteFinishedFutureImpl<>(new IgniteException("Cache group key change was rejected. " + + "The previous change was not completed.")); + } + + int[] grpIds = new int[cacheOrGrpNames.size()]; + byte[] keyIds = new byte[grpIds.length]; + + int n = 0; + + for (String cacheOrGroupName : cacheOrGrpNames) { + CacheGroupContext grp = ctx.cache().cacheGroup(CU.cacheId(cacheOrGroupName)); Review comment: Cache group context not always exists on each node (for example, node filter can be used). Use here cache group descriptors instead of the cache group context. Also, I think cache check is redundant, you should use cacheGroupName parameter and fail if cache group doesn't exist. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -483,16 +530,15 @@ public void onLocalJoin() { if (dataBag.isJoiningNodeClient() || dataBag.commonDataCollectedFor(ENCRYPTION_MGR.ordinal())) return; - HashMap<Integer, byte[]> knownEncKeys = knownEncryptionKeys(); + HashMap<Integer, GroupKeyEncrypted> knownEncKeys = grpKeys.getAll(); Review comment: Can `knownEncKeys` be `null` here? If yes, why `null` check was removed? You can get NPE on `knownEncKeys.putIfAbsent`. If no, there is a redundant check in `newEncryptionKeys`. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -811,6 +1124,53 @@ private void sendGenerateEncryptionKeyRequest(GenerateEncryptionKeyFuture fut) t ctx.io().sendToGridTopic(rndNode.id(), TOPIC_GEN_ENC_KEY, req, SYSTEM_POOL); } + /** + * @param grpIds Cache group IDs. + * @throws IgniteCheckedException If failed. + */ + private void startReencryption(Collection<Integer> grpIds) throws IgniteCheckedException { + if (pageScanner.disabled()) + return; + + for (int grpId : grpIds) { + IgniteInternalFuture<?> fut = pageScanner.schedule(grpId); + + fut.listen(f -> { + try { + f.get(); Review comment: `if (f.isCancelled()) {...} else if (f.error() != null) {...} else {...}` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyChangeProcess.java ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFeatures; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager.EmptyResult; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager.KeyChangeFuture; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteFutureCancelledException; + +import static org.apache.ignite.internal.IgniteFeatures.CACHE_GROUP_KEY_CHANGE; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_FINISH; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_PREPARE; + +/** + * Two phase distributed process, that performs cache group encryption key rotation. + */ +class GroupKeyChangeProcess { + /** Grid kernal context. */ + private final GridKernalContext ctx; + + /** Cache group encyption key change prepare phase. */ + private final DistributedProcess<ChangeCacheEncryptionRequest, EmptyResult> prepareGKChangeProc; + + /** Cache group encyption key change perform phase. */ + private final DistributedProcess<ChangeCacheEncryptionRequest, EmptyResult> performGKChangeProc; + + /** Group encryption keys. */ + private final CacheGroupEncryptionKeys keys; + + /** Cache group key change future. */ + private volatile GroupKeyChangeFuture fut; + + /** Cache group key change request. */ + private volatile ChangeCacheEncryptionRequest req; + + /** + * @param ctx Grid kernal context. + */ + GroupKeyChangeProcess(GridKernalContext ctx, CacheGroupEncryptionKeys keys) { + this.ctx = ctx; + this.keys = keys; + + prepareGKChangeProc = + new DistributedProcess<>(ctx, CACHE_GROUP_KEY_CHANGE_PREPARE, this::prepare, this::finishPrepare); + performGKChangeProc = + new DistributedProcess<>(ctx, CACHE_GROUP_KEY_CHANGE_FINISH, this::perform, this::finishPerform); + } + + /** + * @return {@code True} if operation is still in progress. + */ + public boolean started() { Review comment: `started` -> `inProgress`? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GroupKeyChangeProcess.java ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.encryption; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteFeatures; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager.EmptyResult; +import org.apache.ignite.internal.managers.encryption.GridEncryptionManager.KeyChangeFuture; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteFutureCancelledException; + +import static org.apache.ignite.internal.IgniteFeatures.CACHE_GROUP_KEY_CHANGE; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_FINISH; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CACHE_GROUP_KEY_CHANGE_PREPARE; + +/** + * Two phase distributed process, that performs cache group encryption key rotation. + */ +class GroupKeyChangeProcess { Review comment: This process changes active cache group key and initiates re-encryption process, perhaps it's better to change process name, since for example `GroupKeyChangeProcess.finished()` looks confusing and can be treated as "re-encryption process finished". At least javadoc should be added. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -627,22 +716,104 @@ public void groupKey(int grpId, byte[] encGrpKey) { return withMasterKeyChangeReadLock(() -> getSpi().getMasterKeyName()); } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> changeCacheGroupKey(Collection<String> cacheOrGrpNames) { + A.notEmpty(cacheOrGrpNames, "cacheOrGrpNames"); + + synchronized (opsMux) { + if (stopped) { + return new IgniteFinishedFutureImpl<>(new IgniteException("Cache group key change was rejected. " + + "Node is stopping.")); + } + + if (masterKeyChangeFut != null && !masterKeyChangeFut.isDone()) { + return new IgniteFinishedFutureImpl<>(new IgniteException("Cache group key change was rejected. " + + "The master key change is in progress.")); + } + + return grpKeyChangeProc.start(cacheOrGrpNames); + } + } + + /** + * @param grpIds Cache group IDs. + * @param keyIds Encryption key IDs. + * @param keys Encryption keys. + * @throws IgniteCheckedException If failed. + */ + protected void changeCacheGroupKeyLocal(int[] grpIds, byte[] keyIds, byte[][] keys) throws IgniteCheckedException { + Map<Integer, Byte> encryptionStatus = U.newHashMap(grpIds.length); + + for (int i = 0; i < grpIds.length; i++) + encryptionStatus.put(grpIds[i], keyIds[i]); + + WALPointer ptr = ctx.cache().context().wal().log(new ReencryptionStatusRecord(encryptionStatus)); + + if (ptr != null) + ctx.cache().context().wal().flush(ptr, false); + + for (int i = 0; i < grpIds.length; i++) { + int grpId = grpIds[i]; + + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + + if (grp == null) + continue; + + GroupKeyEncrypted key = new GroupKeyEncrypted(keyIds[i] & 0xff, keys[i]); + + synchronized (metaStorageMux) { + // Store new key as inactive for recovery. + grpKeys.addKey(grpId, key); + + writeToMetaStore(grpId, true, false); + + // Set new key as key for writing. + GroupKey prevGrpKey = grpKeys.changeActiveKey(grpId, key); + + assert prevGrpKey != null && prevGrpKey.id() != key.id() : "prev=" + prevGrpKey + ", currId=" + key.id(); + + grpKeys.reserveWalKey(grpId, prevGrpKey.unsignedId(), ctx.cache().context().wal().currentSegment()); + + writeToMetaStore(grpId, true, true); Review comment: Why can't we write to metastore once? What problem do we solve by these two separate calls? Also, it's the only place where `writeToMetaStore(grp, true, true)` is used, I think writeToMetaStore can be splitted to two methods writeKeysToMetaStore and writeTrackedWalSegmentsToMetaStore (or something like that) for better readability. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -676,6 +869,57 @@ public void onCacheGroupDestroyed(int grpId) { removeGroupKey(grpId); } + /** + * @param grp Cache group. + * @param partId Partition ID. + */ + public void onDestroyPartitionStore(CacheGroupContext grp, int partId) { + pageScanner.cancel(grp.groupId(), partId); + + setEncryptionState(grp, partId, 0, 0); + } + + /** + * Callabck when WAL segment is removed. + * + * @param segmentIdx WAL segment index. + */ + public void onWalSegmentRemoved(long segmentIdx) { + Map<Integer, Set<Integer>> rmvKeys = grpKeys.releaseWalKeys(segmentIdx); + + if (F.isEmpty(rmvKeys)) + return; + + synchronized (metaStorageMux) { + try { + writeToMetaStore(0, false, true); + + for (Map.Entry<Integer, Set<Integer>> entry : rmvKeys.entrySet()) { + int grpId = entry.getKey(); Review comment: `int` -> `Integer` to avoid boxing on `containsKey` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -767,6 +1050,36 @@ public void onCacheGroupDestroyed(int grpId) { } } + /** + * Set reencryption status for partition. + * + * @param grp Cache group. + * @param partId Partition ID. + * @param idx Index of the last reencrypted page. + * @param total Total pages to be reencrypted. + */ + public void setEncryptionState(CacheGroupContext grp, int partId, int idx, int total) { + long[] states = reencryptGroups.computeIfAbsent(grp.groupId(), v -> new long[grp.affinity().partitions() + 1]); + + states[Math.min(partId, states.length - 1)] = ReencryptStateUtils.state(idx, total); Review comment: Let's add a comment about index partition and `Math.min(partId, states.length - 1)` hack ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -627,22 +716,104 @@ public void groupKey(int grpId, byte[] encGrpKey) { return withMasterKeyChangeReadLock(() -> getSpi().getMasterKeyName()); } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> changeCacheGroupKey(Collection<String> cacheOrGrpNames) { + A.notEmpty(cacheOrGrpNames, "cacheOrGrpNames"); + + synchronized (opsMux) { + if (stopped) { + return new IgniteFinishedFutureImpl<>(new IgniteException("Cache group key change was rejected. " + + "Node is stopping.")); + } + + if (masterKeyChangeFut != null && !masterKeyChangeFut.isDone()) { + return new IgniteFinishedFutureImpl<>(new IgniteException("Cache group key change was rejected. " + + "The master key change is in progress.")); + } + + return grpKeyChangeProc.start(cacheOrGrpNames); + } + } + + /** + * @param grpIds Cache group IDs. + * @param keyIds Encryption key IDs. + * @param keys Encryption keys. + * @throws IgniteCheckedException If failed. + */ + protected void changeCacheGroupKeyLocal(int[] grpIds, byte[] keyIds, byte[][] keys) throws IgniteCheckedException { + Map<Integer, Byte> encryptionStatus = U.newHashMap(grpIds.length); + + for (int i = 0; i < grpIds.length; i++) + encryptionStatus.put(grpIds[i], keyIds[i]); + + WALPointer ptr = ctx.cache().context().wal().log(new ReencryptionStatusRecord(encryptionStatus)); + + if (ptr != null) + ctx.cache().context().wal().flush(ptr, false); + + for (int i = 0; i < grpIds.length; i++) { + int grpId = grpIds[i]; + + CacheGroupContext grp = ctx.cache().cacheGroup(grpId); + + if (grp == null) + continue; + + GroupKeyEncrypted key = new GroupKeyEncrypted(keyIds[i] & 0xff, keys[i]); + + synchronized (metaStorageMux) { + // Store new key as inactive for recovery. + grpKeys.addKey(grpId, key); + + writeToMetaStore(grpId, true, false); + + // Set new key as key for writing. + GroupKey prevGrpKey = grpKeys.changeActiveKey(grpId, key); + + assert prevGrpKey != null && prevGrpKey.id() != key.id() : "prev=" + prevGrpKey + ", currId=" + key.id(); + + grpKeys.reserveWalKey(grpId, prevGrpKey.unsignedId(), ctx.cache().context().wal().currentSegment()); + + writeToMetaStore(grpId, true, true); + } + + reencryptGroups.put(grpId, pageScanner.pagesCount(grp)); + + if (log.isInfoEnabled()) + log.info("New encryption key for group was added [grpId=" + grpId + ", keyId=" + key.id() + "]"); + } + + startReencryption(encryptionStatus.keySet()); + } + + /** + * @param grpId Cache group ID. + * @return Future that will be completed when reencryption of the specified group is finished. + */ + public IgniteInternalFuture<Void> reencryptionFuture(int grpId) { + if (pageScanner.disabled() && reencryptGroups.containsKey(grpId)) + return new GridFutureAdapter<>(); Review comment: This future will never be finished, do we really need unfinished future here? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -194,10 +207,22 @@ * Master key change prepare process. Checks that all server nodes have the same new master key and then starts * finish process. */ - private DistributedProcess<MasterKeyChangeRequest, MasterKeyChangeResult> prepareMKChangeProc; + private DistributedProcess<MasterKeyChangeRequest, EmptyResult> prepareMKChangeProc; /** Process to perform the master key change. Changes master key and reencrypt group keys. */ - private DistributedProcess<MasterKeyChangeRequest, MasterKeyChangeResult> performMKChangeProc; + private DistributedProcess<MasterKeyChangeRequest, EmptyResult> performMKChangeProc; + + /** Two phase distributed process, that performs cache group encryption key rotation. */ + private GroupKeyChangeProcess grpKeyChangeProc; + + /** Cache groups for which encryption key was changed, and they must be re-encrypted. */ + private final Map<Integer, long[]> reencryptGroups = new ConcurrentHashMap<>(); + + /** Cache groups for which encryption key was changed on node join. */ + private final Map<Integer, Integer> reencryptGroupsForced = new ConcurrentHashMap<>(); Review comment: Why do we only add entries to this map and never clean it up? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java ########## @@ -655,19 +826,41 @@ private void removeGroupKey(int grpId) { /** * Callback for cache group start event. - * @param grpId Group id. + * + * @param grpId Cache group ID. * @param encKey Encryption key */ public void beforeCacheGroupStart(int grpId, @Nullable byte[] encKey) { if (encKey == null || ctx.clientNode()) return; - groupKey(grpId, encKey); + addGroupKey(grpId, new GroupKeyEncrypted(INITIAL_KEY_ID, encKey)); + } + + /** + * Callback is called before invalidate page memory. + * + * @param grpId Cache group ID. + */ + public void onCacheGroupStop(int grpId) { + IgniteInternalFuture<Void> fut = reencryptionFuture(grpId); + + if (!fut.isDone()) { Review comment: `fut.isDone()` - redundant ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
