Repository: ignite Updated Branches: refs/heads/ignite-zk 4090eb717 -> a6b452823
IGNITE-6931: SQL: simplify index rebuild. Now both CREATE INDEX and "rebuild from hash" routines use the same iteration and entry locking logic. This closes #3052. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65080675 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65080675 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65080675 Branch: refs/heads/ignite-zk Commit: 65080675dec8433c0968762c903905d1f65e5f29 Parents: db343b6 Author: Igor Seliverstov <gvvinbl...@gmail.com> Authored: Tue Nov 21 12:26:23 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Tue Nov 21 12:26:23 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 13 +-- .../processors/cache/GridCacheMapEntry.java | 17 +-- .../cache/IgniteCacheOffheapManager.java | 19 ---- .../cache/IgniteCacheOffheapManagerImpl.java | 21 ---- .../persistence/GridCacheOffheapManager.java | 7 -- .../processors/query/GridQueryProcessor.java | 51 ++++++++- .../query/schema/SchemaIndexCacheFilter.java | 33 ++++++ .../schema/SchemaIndexCacheVisitorImpl.java | 108 +++++++++---------- .../processors/cache/GridCacheTestEntryEx.java | 10 +- .../processors/query/h2/IgniteH2Indexing.java | 64 ++++------- 10 files changed, 166 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 6da7bc7..313a0c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.util.lang.GridTuple3; import org.jetbrains.annotations.Nullable; @@ -869,12 +870,13 @@ public interface GridCacheEntryEx { /** * Update index from within entry lock, passing key, value, and expiration time to provided closure. * + * @param filter Row filter. * @param clo Closure to apply to key, value, and expiration time. * @throws IgniteCheckedException If failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException, - GridCacheEntryRemovedException; + public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure clo) + throws IgniteCheckedException, GridCacheEntryRemovedException; /** * @return Expire time, without accounting for transactions or removals. @@ -919,13 +921,6 @@ public interface GridCacheEntryEx { public void updateTtl(@Nullable GridCacheVersion ver, long ttl) throws GridCacheEntryRemovedException; /** - * Ensures that the value stored in the entry is also inserted in the indexing. - * - * @throws GridCacheEntryRemovedException If entry was removed. - */ - public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException; - - /** * @return Value. * @throws IgniteCheckedException If failed to read from swap storage. * @throws GridCacheEntryRemovedException If entry was removed. http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 778a46e..79390ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; @@ -3141,16 +3142,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException { - synchronized (this) { - checkObsolete(); - - if (cctx.queries().enabled()) - cctx.offheap().updateIndexes(cctx, key, localPartition()); - } - } - - /** {@inheritDoc} */ @Override public synchronized CacheObject valueBytes() throws GridCacheEntryRemovedException { checkObsolete(); @@ -3300,8 +3291,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException, - GridCacheEntryRemovedException { + @Override public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure clo) + throws IgniteCheckedException, GridCacheEntryRemovedException { synchronized (this) { if (isInternal()) return; @@ -3310,7 +3301,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheDataRow row = cctx.offheap().read(this); - if (row != null) + if (row != null && (filter == null || filter.apply(row))) clo.apply(row); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 761b787..84c69a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -186,18 +186,6 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. * @param key Key. - * @param part Partition. - * @throws IgniteCheckedException If failed. - */ - public void updateIndexes( - GridCacheContext cctx, - KeyCacheObject key, - GridDhtLocalPartition part - ) throws IgniteCheckedException; - - /** - * @param cctx Cache context. - * @param key Key. * @param partId Partition number. * @param part Partition. * @throws IgniteCheckedException If failed. @@ -454,13 +442,6 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. * @param key Key. - * @throws IgniteCheckedException If failed. - */ - void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException; - - /** - * @param cctx Cache context. - * @param key Key. * @param c Closure. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index c85ba1d..370a92e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -359,12 +359,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part) - throws IgniteCheckedException { - dataStore(part).updateIndexes(cctx, key); - } - - /** {@inheritDoc} */ @Override public void remove( GridCacheContext cctx, KeyCacheObject key, @@ -1362,21 +1356,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { - int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - - CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); - - if (row != null) { - row.key(key); - - GridCacheQueryManager qryMgr = cctx.queries(); - - qryMgr.store(row, null, false); - } - } - - /** {@inheritDoc} */ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 6ed62f8..cfa1829 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1236,13 +1236,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { - CacheDataStore delegate = init0(false); - - delegate.updateIndexes(cctx, key); - } - - /** {@inheritDoc} */ @Override public CacheDataRow createRow( GridCacheContext cctx, KeyCacheObject key, http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index ed5fdd9..471888a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken; @@ -96,10 +97,12 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.util.worker.GridWorkerFuture; @@ -1319,12 +1322,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { if (op instanceof SchemaIndexCreateOperation) { - SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; + final SchemaIndexCreateOperation op0 = (SchemaIndexCreateOperation)op; QueryIndexDescriptorImpl idxDesc = QueryUtils.createIndexDescriptor(type, op0.index()); - SchemaIndexCacheVisitor visitor = - new SchemaIndexCacheVisitorImpl(this, cache.context(), cacheName, op0.tableName(), cancelTok); + GridCacheContext cctx = cache.context(); + + SchemaIndexCacheFilter filter = new TableCacheFilter(cctx, op0.tableName()); + + SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx, filter, cancelTok); idx.dynamicIndexCreate(op0.schemaName(), op0.tableName(), idxDesc, op0.ifNotExists(), visitor); } @@ -2869,4 +2875,43 @@ public class GridQueryProcessor extends GridProcessorAdapter { this.mgr = mgr; } } + + /** */ + private static class TableCacheFilter implements SchemaIndexCacheFilter { + /** */ + @GridToStringExclude + private final GridCacheContext cctx; + + /** */ + @GridToStringExclude + private final GridQueryProcessor query; + + /** */ + private final String cacheName; + + /** */ + private final String tableName; + + /** + * @param cctx Cache context. + * @param tableName Target table name. + */ + TableCacheFilter(GridCacheContext cctx, String tableName) { + this.cctx = cctx; + this.tableName = tableName; + + cacheName = cctx.name(); + query = cctx.kernalContext().query(); + } + + /** {@inheritDoc} */ + @Override public boolean apply(CacheDataRow row) throws IgniteCheckedException { + return query.belongsToTable(cctx, cacheName, tableName, row.key(), row.value()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TableCacheFilter.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java new file mode 100644 index 0000000..32600c6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheFilter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; + +/** + * Index row filter accepting current entry. + */ +public interface SchemaIndexCacheFilter { + /** + * @param row Cache data row. + * @return {@code True} if row passes the filter. + * @throws IgniteCheckedException If failed. + */ + boolean apply(CacheDataRow row) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java index fda7d1d..c11c614 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.query.schema; -import java.util.Collection; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; -import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.internal.S; @@ -40,35 +39,36 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Traversor operating all primary and backup partitions of given cache. */ +@SuppressWarnings("ForLoopReplaceableByForEach") public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { - /** Query procssor. */ - private final GridQueryProcessor qryProc; + /** Count of rows, being processed within a single checkpoint lock. */ + private static final int BATCH_SIZE = 1000; /** Cache context. */ private final GridCacheContext cctx; - /** Cache name. */ - private final String cacheName; - - /** Table name. */ - private final String tblName; + /** Row filter. */ + private final SchemaIndexCacheFilter rowFilter; /** Cancellation token. */ private final SchemaIndexOperationCancellationToken cancel; /** * Constructor. - * - * @param cctx Cache context. - * @param cacheName Cache name. - * @param tblName Table name. + * @param cctx Cache context. + */ + public SchemaIndexCacheVisitorImpl(GridCacheContext cctx) { + this(cctx, null, null); + } + + /** + * Constructor. + * @param cctx Cache context. * @param cancel Cancellation token. */ - public SchemaIndexCacheVisitorImpl(GridQueryProcessor qryProc, GridCacheContext cctx, String cacheName, - String tblName, SchemaIndexOperationCancellationToken cancel) { - this.qryProc = qryProc; - this.cacheName = cacheName; - this.tblName = tblName; + public SchemaIndexCacheVisitorImpl(GridCacheContext cctx, SchemaIndexCacheFilter rowFilter, + SchemaIndexOperationCancellationToken cancel) { + this.rowFilter = rowFilter; this.cancel = cancel; if (cctx.isNear()) @@ -81,12 +81,10 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException { assert clo != null; - FilteringVisitorClosure filterClo = new FilteringVisitorClosure(clo); - - Collection<GridDhtLocalPartition> parts = cctx.topology().localPartitions(); + List<GridDhtLocalPartition> parts = cctx.topology().localPartitions(); - for (GridDhtLocalPartition part : parts) - processPartition(part, filterClo); + for (int i = 0, size = parts.size(); i < size; i++) + processPartition(parts.get(i), clo); } /** @@ -96,7 +94,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { * @param clo Index closure. * @throws IgniteCheckedException If failed. */ - private void processPartition(GridDhtLocalPartition part, FilteringVisitorClosure clo) + private void processPartition(GridDhtLocalPartition part, SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException { checkCancelled(); @@ -114,15 +112,35 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { null, CacheDataRowAdapter.RowData.KEY_ONLY); - while (cursor.next()) { - CacheDataRow row = cursor.get(); + boolean locked = false; + + try { + int cntr = 0; + + while (cursor.next()) { + KeyCacheObject key = cursor.get().key(); + + if (!locked) { + cctx.shared().database().checkpointReadLock(); - KeyCacheObject key = row.key(); + locked = true; + } - processKey(key, clo); + processKey(key, clo); - if (part.state() == RENTING) - break; + if (++cntr % BATCH_SIZE == 0) { + cctx.shared().database().checkpointReadUnlock(); + + locked = false; + } + + if (part.state() == RENTING) + break; + } + } + finally { + if (locked) + cctx.shared().database().checkpointReadUnlock(); } } finally { @@ -137,7 +155,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { * @param clo Closure. * @throws IgniteCheckedException If failed. */ - private void processKey(KeyCacheObject key, FilteringVisitorClosure clo) throws IgniteCheckedException { + private void processKey(KeyCacheObject key, SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException { while (true) { try { checkCancelled(); @@ -145,7 +163,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { GridCacheEntryEx entry = cctx.cache().entryEx(key); try { - entry.updateIndex(clo); + entry.updateIndex(rowFilter, clo); } finally { cctx.evicts().touch(entry, AffinityTopologyVersion.NONE); @@ -168,7 +186,7 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { * @throws IgniteCheckedException If cancelled. */ private void checkCancelled() throws IgniteCheckedException { - if (cancel.isCancelled()) + if (cancel != null && cancel.isCancelled()) throw new IgniteCheckedException("Index creation was cancelled."); } @@ -176,28 +194,4 @@ public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { @Override public String toString() { return S.toString(SchemaIndexCacheVisitorImpl.class, this); } - - /** - * Filtering visitor closure. - */ - private class FilteringVisitorClosure implements SchemaIndexCacheVisitorClosure { - - /** Target closure. */ - private final SchemaIndexCacheVisitorClosure target; - - /** - * Constructor. - * - * @param target Target. - */ - FilteringVisitorClosure(SchemaIndexCacheVisitorClosure target) { - this.target = target; - } - - /** {@inheritDoc} */ - @Override public void apply(CacheDataRow row) throws IgniteCheckedException { - if (qryProc.belongsToTable(cctx, cacheName, tblName, row.key(), row.value())) - target.apply(row); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 2ba8fd8..3d7edac 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; @@ -817,11 +818,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ - @Override public void ensureIndexed() throws GridCacheEntryRemovedException, IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ @Override public CacheObject unswap() throws IgniteCheckedException { return null; } @@ -842,8 +838,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** {@inheritDoc} */ - @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException, - GridCacheEntryRemovedException { + @Override public void updateIndex(SchemaIndexCacheFilter filter, SchemaIndexCacheVisitorClosure clo) + throws IgniteCheckedException, GridCacheEntryRemovedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/65080675/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index c37e5f0..f3a95a5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -65,17 +65,13 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; @@ -116,6 +112,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExe import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.sql.SqlParseException; import org.apache.ignite.internal.sql.SqlParser; @@ -2021,44 +2018,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId); - IgniteCacheOffheapManager offheapMgr = cctx.isNear() ? cctx.near().dht().context().offheap() : cctx.offheap(); + final GridCacheQueryManager qryMgr = cctx.queries(); - for (int p = 0; p < cctx.affinity().partitions(); p++) { - try (GridCloseableIterator<KeyCacheObject> keyIter = offheapMgr.cacheKeysIterator(cctx.cacheId(), p)) { - while (keyIter.hasNext()) { - cctx.shared().database().checkpointReadLock(); + SchemaIndexCacheVisitor visitor = new SchemaIndexCacheVisitorImpl(cctx); - try { - KeyCacheObject key = keyIter.next(); - - while (true) { - GridCacheEntryEx entry = null; - - try { - entry = cctx.isNear() ? - cctx.near().dht().entryEx(key) : cctx.cache().entryEx(key); - - entry.ensureIndexed(); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - // Retry. - } - catch (GridDhtInvalidPartitionException ignore) { - break; - } - finally { - entry.context().evicts().touch(entry, AffinityTopologyVersion.NONE); - } - } - } - finally { - cctx.shared().database().checkpointReadUnlock(); - } - } - } - } + visitor.visit(new RebuldIndexFromHashClosure(qryMgr)); for (H2TableDescriptor tblDesc : tables(cacheName)) tblDesc.table().markRebuildFromHashInProgress(false); @@ -2653,4 +2617,22 @@ public class IgniteH2Indexing implements GridQueryIndexing { private interface ClIter<X> extends AutoCloseable, Iterator<X> { // No-op. } + + /** */ + private static class RebuldIndexFromHashClosure implements SchemaIndexCacheVisitorClosure { + /** */ + private final GridCacheQueryManager qryMgr; + + /** + * @param qryMgr Query manager. + */ + RebuldIndexFromHashClosure(GridCacheQueryManager qryMgr) { + this.qryMgr = qryMgr; + } + + /** {@inheritDoc} */ + @Override public void apply(CacheDataRow row) throws IgniteCheckedException { + qryMgr.store(row, null, false); + } + } }