alex-plekhanov commented on a change in pull request #9118: URL: https://github.com/apache/ignite/pull/9118#discussion_r657763218
########## File path: modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import javax.cache.Cache; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.lang.IgniteExperimental; +import org.jetbrains.annotations.Nullable; + +/** + * Index query runs over internal index structure and returns cache entries for index rows that match specified criteria. + */ +@IgniteExperimental +public class IndexQuery<K, V> extends Query<Cache.Entry<K, V>> { Review comment: Lets make class `final` ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.List; +import java.util.Objects; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class IndexQueryLocalTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final int CNT = 10_000; + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** Should return full data. */ + @Test + public void testServerNodeReplicatedCache() throws Exception { + Ignite crd = startGrids(4); + + IgniteCache cache = crd.getOrCreateCache(ccfg(CacheMode.REPLICATED)); + + insertData(crd, cache); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", CNT / 2)); + + for (int i = 0; i < 4; i++) { + cache = grid(i).cache(CACHE); + + List result = cache.query(qry.setLocal(true)).getAll(); + + assertEquals(CNT / 2, result.size()); + } + } + + /** Should return part of data only. */ + @Test + public void testServerNodePartitionedCache() throws Exception { + Ignite crd = startGrids(4); + + IgniteCache cache = crd.getOrCreateCache(ccfg(CacheMode.PARTITIONED)); + + insertData(crd, cache); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", CNT / 2)); + + for (int i = 0; i < 4; i++) { + cache = grid(i).cache(CACHE); + + List result = cache.query(qry.setLocal(true)).getAll(); + + assertTrue(CNT / 2 > result.size()); Review comment: Lets also check that sum of `result.size()` from all nodes equals to `CNT / 2 * (backups + 1)` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java ########## @@ -377,6 +377,28 @@ public IndexesRebuildTask idxRebuild() { } } + /** + * Returns index for specified name. + * + * @param idxName Index name. + * @return Index for specified index name. + */ + public Index index(IndexName idxName) { + ddlLock.readLock().lock(); + + try { + Map<String, Index> idxs = cacheToIdx.get(idxName.cacheName()); + + if (idxs == null) + return null; + + return idxs.get(idxName.fullName()); + + } finally { Review comment: NL after `}` ########## File path: modules/core/src/main/java/org/apache/ignite/cache/query/IndexQueryCriteria.java ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.io.Serializable; +import java.util.List; + +/** + * Basic interface for all {@link IndexQuery} criteria. + */ +public interface IndexQueryCriteria extends Serializable { + /** + * @return List of fields that this query criteria applies to. + */ + public abstract List<String> fields(); Review comment: `abstract` keyword seems redundant. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ########## @@ -483,11 +523,18 @@ public ClusterGroup projection() { return part; } + /** + * @return Index query description. + */ + @Nullable public IndexQueryDesc idxQryDesc() { return idxQryDesc; } + /** * @throws IgniteCheckedException If query is invalid. */ public void validate() throws IgniteCheckedException { - if ((type != SCAN && type != SET && type != SPI) && !QueryUtils.isEnabled(cctx.config())) + if ((type != SCAN && type != SET && type != SPI && type != INDEX) + && !QueryUtils.isEnabled(cctx.config())) + Review comment: redundant NL ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryQueryEntityTest.java ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class IndexQueryQueryEntityTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String CACHE_TBL_NAME = "TEST_CACHE_TBL_NAME"; + + /** */ + private static final String TABLE = "TEST_TABLE"; + + /** */ + private static final String ID_IDX = "ID_IDX"; + + /** */ + private static final String DESC_ID_IDX = "DESC_ID_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private IgniteCache<Long, Person> cache; + + /** */ + private IgniteCache<Long, Person> cacheTblName; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignite crd = startGrids(4); + + cache = crd.cache(CACHE); + cacheTblName = crd.cache(CACHE_TBL_NAME); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + QueryIndex idIdx = new QueryIndex("id", true, ID_IDX); + QueryIndex descIdIdx = new QueryIndex("descId", false, DESC_ID_IDX); + + QueryEntity e = new QueryEntity(Long.class.getName(), Person.class.getName()) + .setFields(new LinkedHashMap<>( + F.asMap("id", Integer.class.getName(), "descId", Integer.class.getName())) + ) + .setIndexes(Arrays.asList(idIdx, descIdIdx)); + + CacheConfiguration<?, ?> ccfg1 = new CacheConfiguration<>() + .setName(CACHE) + .setQueryEntities(Collections.singletonList(e)); + + QueryEntity entTableName = new QueryEntity(e); + entTableName.setTableName(TABLE); + + CacheConfiguration<?, ?> ccfg2 = new CacheConfiguration<>() + .setName(CACHE_TBL_NAME) + .setQueryEntities(Collections.singletonList(entTableName)); + + cfg.setCacheConfiguration(ccfg1, ccfg2); + + return cfg; + } + + /** */ + @Test + public void testEmptyCache() { + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", Integer.MAX_VALUE)); + + assertTrue(cache.query(qry).getAll().isEmpty()); + + qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("descId", Integer.MAX_VALUE)); + + assertTrue(cache.query(qry).getAll().isEmpty()); + + qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", Integer.MAX_VALUE)); + + assertTrue(cache.query(qry).getAll().isEmpty()); + + qry = new IndexQuery<Long, Person>(Person.class) Review comment: Why do we need to check this and the previous query twice? ########## File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java ########## @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.query.index; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.IndexQuery; +import org.apache.ignite.cache.query.IndexQueryCriteria; +import org.apache.ignite.cache.query.IndexQueryCriteriaBuilder; +import org.apache.ignite.internal.cache.query.RangeIndexQueryCriteria; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator; +import org.apache.ignite.internal.cache.query.index.sorted.IndexSearchRowImpl; +import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler; +import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext; +import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.cache.query.index.SortOrder.DESC; + +/** + * Processor of {@link IndexQuery}. + */ +public class IndexQueryProcessor { + /** */ + private final IndexProcessor idxProc; + + /** */ + public IndexQueryProcessor(IndexProcessor idxProc) { + this.idxProc = idxProc; + } + + /** Run query on local node. */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocal( + GridCacheContext<K, V> cctx, IndexQueryDesc idxQryDesc, IndexQueryContext qryCtx, boolean keepBinary) + throws IgniteCheckedException { + + Index idx = index(cctx, idxQryDesc); + + if (idx == null) + throw new IgniteCheckedException( + "No index matches index query. Cache=" + cctx.name() + "; Qry=" + idxQryDesc); + + GridCursor<IndexRow> cursor = query(cctx, idx, idxQryDesc.criteria(), qryCtx); + + // Map IndexRow to Cache Key-Value pair. + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IndexRow currVal; + + private final CacheObjectContext coctx = cctx.cacheObjectContext(); + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (currVal != null) + return true; + + if (!cursor.next()) + return false; + + currVal = cursor.get(); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<K, V> onNext() { + if (currVal == null) + if (!hasNext()) + throw new NoSuchElementException(); + + IndexRow row = currVal; + + currVal = null; + + K k = (K) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().key(), keepBinary, false); + V v = (V) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().value(), keepBinary, false); + + return new IgniteBiTuple<>(k, v); + } + }; + } + + /** Get index to run query by specified description. */ + private Index index(GridCacheContext cctx, IndexQueryDesc idxQryDesc) throws IgniteCheckedException { + Class<?> valCls = idxQryDesc.valCls() != null ? loadValClass(cctx, idxQryDesc.valCls()) : null; + + String tableName = cctx.kernalContext().query().tableName(cctx.name(), valCls); + + if (tableName == null) + return null; + + // Find index by specified name. + if (idxQryDesc.idxName() != null) { + String name = idxQryDesc.idxName(); + + if (!QueryUtils.PRIMARY_KEY_INDEX.equals(name)) + name = name.toUpperCase(); + + String schema = cctx.kernalContext().query().schemaName(cctx); + + IndexName idxName = new IndexName(cctx.name(), schema, tableName, name); + + Index idx = idxProc.index(idxName); + + if (idx == null) + return null; + + return checkIndex(idxProc.indexDefinition(idx.id()), idxQryDesc.criteria()) ? idx : null; + } + + // Try get index by list of fields to query. + // Check all indexes by cache to find best index match: count of index fields equals to count of index criteria fields. + Collection<Index> idxs = idxProc.indexes(cctx); + + Index idx = null; + int idxFieldsCnt = 0; + + for (Index i: idxs) { + IndexDefinition idxDef = idxProc.indexDefinition(i.id()); + + if (!tableName.equals(idxDef.idxName().tableName())) + continue; + + int fldsCnt = idxDef.indexKeyDefinitions().size(); + + if (checkIndex(idxDef, idxQryDesc.criteria())) { + if (idx == null) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else if (fldsCnt < idxFieldsCnt) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else continue; + + // Best match. Index query criteria matches full index. + if (idxQryDesc.criteria().fields().size() == idxDef.indexKeyDefinitions().size()) + break; + } + } + + return idx; + } + + /** Checks that specified index matches index query criteria. */ + private boolean checkIndex(IndexDefinition idxDef, IndexQueryCriteria criteria) { + if (criteria.fields().size() > idxDef.indexKeyDefinitions().size()) + return false; + + for (int i = 0; i < criteria.fields().size(); i++) { + if (!idxDef.indexKeyDefinitions().get(i).name().equalsIgnoreCase(criteria.fields().get(i))) + return false; + } + + return true; + } + + /** */ + private Class<?> loadValClass(GridCacheContext cctx, String valClsName) throws IgniteCheckedException { + try { + ClassLoader clsLdr = U.resolveClassLoader(cctx.kernalContext().config()); + + return clsLdr.loadClass(valClsName); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("No cache serves class: " + valClsName); + } + } + + /** Runs a query and return single cursor or cursor over multiple index segments. */ + private GridCursor<IndexRow> query(GridCacheContext cctx, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + int segmentsCnt = cctx.isPartitioned() ? cctx.config().getQueryParallelism() : 1; + + if (segmentsCnt == 1) + return query(0, idx, criteria, qryCtx); + + final GridCursor<IndexRow>[] segments = new GridCursor[segmentsCnt]; + + // Actually it just traverse BPlusTree to find boundaries. It's too fast to parallelize this. + for (int i = 0; i < segmentsCnt; i++) + segments[i] = query(i, idx, criteria, qryCtx); + + return new SegmentedIndexCursor(segments, ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator()); + } + + /** Coordinate query criteria. */ + private GridCursor<IndexRow> query(int segment, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + if (criteria instanceof RangeIndexQueryCriteria) + return treeIndexRange((InlineIndex) idx, (RangeIndexQueryCriteria) criteria, segment, qryCtx); + + throw new IllegalStateException("Doesn't support index query criteria: " + criteria.getClass().getName()); + } + + /** + * Runs range query over specified segment. There are 2 steps to run query: + * 1. Traverse index by specified boundaries; + * 2. Scan over cursor and filter rows that doesn't match user criteria. + * + * Filtering is required in 2 cases: + * 1. Exclusion of one of boundaries, as idx.find() includes both of them; + * 2. To apply criteria on non-first index fields. Tree apply boundaries field by field, if first field match + * a boundary, then second field isn't checked within traversing. + */ + private GridCursor<IndexRow> treeIndexRange(InlineIndex idx, RangeIndexQueryCriteria criteria, int segment, + IndexQueryContext qryCtx) throws IgniteCheckedException { + + InlineIndexRowHandler hnd = idx.segment(0).rowHandler(); + CacheObjectContext coctx = idx.segment(0).cacheContext().cacheObjectContext(); + + IndexKey[] lowerBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + IndexKey[] upperBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + + boolean lowerAllNulls = true; + boolean upperAllNulls = true; + + List<RangeIndexQueryCriteria.RangeCriterion> treeCriteria = new ArrayList<>(); + + for (int i = 0; i < criteria.fields().size(); i++) { + String f = criteria.fields().get(i); + + IndexKeyDefinition def = hnd.indexKeyDefinitions().get(i); + + if (!def.name().equalsIgnoreCase(f)) + throw new IgniteCheckedException("Range query doesn't match index '" + idx.name() + "'"); + + RangeIndexQueryCriteria.RangeCriterion c = criteria.criteria().get(i); + + // If index is desc, then we need to swap boundaries as user declare criteria in straight manner. + // For example, there is an idx (int Val desc). It means that index stores data in reverse order (1 < 0). + // But user won't expect for criterion gt(1) to get 0 as result, instead user will use lt(1) for getting + // 0. Then we need to swap user criterion. + if (def.order().sortOrder() == DESC) + c = c.swap(); + + treeCriteria.add(c); + + IndexKey l = key(c.lower(), def, hnd.indexKeyTypeSettings(), coctx); + IndexKey u = key(c.upper(), def, hnd.indexKeyTypeSettings(), coctx); + + if (l != null) + lowerAllNulls = false; + + if (u != null) + upperAllNulls = false; + + lowerBounds[i] = l; + upperBounds[i] = u; + } + + IndexRow lower = lowerAllNulls ? null : new IndexSearchRowImpl(lowerBounds, hnd); + IndexRow upper = upperAllNulls ? null : new IndexSearchRowImpl(upperBounds, hnd); + + // Step 1. Traverse index. + GridCursor<IndexRow> findRes = idx.find(lower, upper, segment, qryCtx); + + // Step 2. Scan and filter. + return new GridCursor<IndexRow>() { + /** */ + private final IndexRowComparator rowCmp = ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator(); + + /** {@inheritDoc} */ + @Override public boolean next() throws IgniteCheckedException { + if (!findRes.next()) + return false; + + while (match(get(), lower, 1) || match(get(), upper, -1)) { + if (!findRes.next()) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public IndexRow get() throws IgniteCheckedException { + return findRes.get(); + } + + /** + * Matches index row, boundary and inclusion mask to decide whether this row will be excluded from result. + * + * @param row Result row to check. + * @param boundary Index search boundary. + * @param boundarySign {@code 1} for lower boundary and {@code -1} for upper boundary. + * @return {@code true} if specified row has to be excluded from result. + */ + private boolean match(IndexRow row, IndexRow boundary, int boundarySign) throws IgniteCheckedException { Review comment: Bad method name. Actually, it returns true if not matched, so perhaps `notMatch` will be better, or left name as `match` but reverse return value logic. ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryAllTypesTest.java ########## @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gte; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lte; + +/** */ +public class IndexQueryAllTypesTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private IgniteCache<Long, Person> cache; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignite crd = startGrids(2); + + cache = crd.cache(CACHE); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<Long, Person>() + .setName(CACHE) + .setIndexedTypes(Long.class, Person.class); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** */ + @Test + public void testRangeWithNulls() { + Function<Integer, Person> persGen = i -> { + Integer val = i < CNT / 10 ? null : i; + + return person("intNullId", val); + }; + + insertData(i -> i, persGen, CNT); + + int pivot = CNT / 5; + + // Should include nulls. + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("intNullId", pivot)); + + check(cache.query(qry), 0, CNT / 5, i -> i, persGen); + + // Should exclude nulls. + qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(gte("intNullId", 0)); + + check(cache.query(qry), CNT / 10, CNT, i -> i, persGen); + + // Should return only nulls. + qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("intNullId", 0)); + + check(cache.query(qry), 0, CNT / 10, i -> i, persGen); + + // Should return only nulls. + qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lte("intNullId", null)); + + check(cache.query(qry), 0, CNT / 10, i -> i, persGen); + + // Should return all non nulls. + qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(gt("intNullId", null)); + + check(cache.query(qry), CNT / 10, CNT, i -> i, persGen); Review comment: Lets also add check for `gte null` ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.List; +import java.util.Objects; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class IndexQueryLocalTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final int CNT = 10_000; + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** Should return full data. */ + @Test + public void testServerNodeReplicatedCache() throws Exception { + Ignite crd = startGrids(4); + + IgniteCache cache = crd.getOrCreateCache(ccfg(CacheMode.REPLICATED)); + + insertData(crd, cache); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", CNT / 2)); + + for (int i = 0; i < 4; i++) { + cache = grid(i).cache(CACHE); + + List result = cache.query(qry.setLocal(true)).getAll(); + + assertEquals(CNT / 2, result.size()); + } + } + + /** Should return part of data only. */ + @Test + public void testServerNodePartitionedCache() throws Exception { + Ignite crd = startGrids(4); + + IgniteCache cache = crd.getOrCreateCache(ccfg(CacheMode.PARTITIONED)); + + insertData(crd, cache); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", CNT / 2)); + + for (int i = 0; i < 4; i++) { + cache = grid(i).cache(CACHE); + + List result = cache.query(qry.setLocal(true)).getAll(); + + assertTrue(CNT / 2 > result.size()); + } + } + + /** Should fail as no data on nodes. */ + @Test + public void testClientNodeReplicatedCache() throws Exception { + startGrid(); + + Ignite cln = startClientGrid(1); + + IgniteCache cache = cln.getOrCreateCache(ccfg(CacheMode.REPLICATED)); + + insertData(cln, cache); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", CNT / 2)); + + GridTestUtils.assertThrows(null, () -> cache.query(qry.setLocal(true)).getAll(), Review comment: Do we have the same exception for such a case with other query types? Message text looks confusing. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ########## @@ -2277,6 +2285,21 @@ public boolean belongsToTable(GridCacheContext cctx, String expCacheName, String return true; } + /** + * Get table name by specified cache and cache value class. + * + * @param cacheName Cache name. + * @param valCls Value class. + * @return Table name or null if there is no match. + */ + public String tableName(String cacheName, Class<?> valCls) { Review comment: Lets annotate with `@Nullable` ########## File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java ########## @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.query.index; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.IndexQuery; +import org.apache.ignite.cache.query.IndexQueryCriteria; +import org.apache.ignite.cache.query.IndexQueryCriteriaBuilder; +import org.apache.ignite.internal.cache.query.RangeIndexQueryCriteria; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator; +import org.apache.ignite.internal.cache.query.index.sorted.IndexSearchRowImpl; +import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler; +import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext; +import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.cache.query.index.SortOrder.DESC; + +/** + * Processor of {@link IndexQuery}. + */ +public class IndexQueryProcessor { + /** */ + private final IndexProcessor idxProc; + + /** */ + public IndexQueryProcessor(IndexProcessor idxProc) { + this.idxProc = idxProc; + } + + /** Run query on local node. */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocal( + GridCacheContext<K, V> cctx, IndexQueryDesc idxQryDesc, IndexQueryContext qryCtx, boolean keepBinary) + throws IgniteCheckedException { + + Index idx = index(cctx, idxQryDesc); + + if (idx == null) + throw new IgniteCheckedException( + "No index matches index query. Cache=" + cctx.name() + "; Qry=" + idxQryDesc); + + GridCursor<IndexRow> cursor = query(cctx, idx, idxQryDesc.criteria(), qryCtx); + + // Map IndexRow to Cache Key-Value pair. + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IndexRow currVal; + + private final CacheObjectContext coctx = cctx.cacheObjectContext(); + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (currVal != null) + return true; + + if (!cursor.next()) + return false; + + currVal = cursor.get(); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<K, V> onNext() { + if (currVal == null) + if (!hasNext()) + throw new NoSuchElementException(); + + IndexRow row = currVal; + + currVal = null; + + K k = (K) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().key(), keepBinary, false); + V v = (V) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().value(), keepBinary, false); + + return new IgniteBiTuple<>(k, v); + } + }; + } + + /** Get index to run query by specified description. */ + private Index index(GridCacheContext cctx, IndexQueryDesc idxQryDesc) throws IgniteCheckedException { + Class<?> valCls = idxQryDesc.valCls() != null ? loadValClass(cctx, idxQryDesc.valCls()) : null; + + String tableName = cctx.kernalContext().query().tableName(cctx.name(), valCls); + + if (tableName == null) + return null; + + // Find index by specified name. + if (idxQryDesc.idxName() != null) { + String name = idxQryDesc.idxName(); + + if (!QueryUtils.PRIMARY_KEY_INDEX.equals(name)) + name = name.toUpperCase(); + + String schema = cctx.kernalContext().query().schemaName(cctx); + + IndexName idxName = new IndexName(cctx.name(), schema, tableName, name); + + Index idx = idxProc.index(idxName); + + if (idx == null) + return null; + + return checkIndex(idxProc.indexDefinition(idx.id()), idxQryDesc.criteria()) ? idx : null; + } + + // Try get index by list of fields to query. + // Check all indexes by cache to find best index match: count of index fields equals to count of index criteria fields. + Collection<Index> idxs = idxProc.indexes(cctx); + + Index idx = null; + int idxFieldsCnt = 0; + + for (Index i: idxs) { + IndexDefinition idxDef = idxProc.indexDefinition(i.id()); + + if (!tableName.equals(idxDef.idxName().tableName())) + continue; + + int fldsCnt = idxDef.indexKeyDefinitions().size(); + + if (checkIndex(idxDef, idxQryDesc.criteria())) { + if (idx == null) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else if (fldsCnt < idxFieldsCnt) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else continue; + + // Best match. Index query criteria matches full index. + if (idxQryDesc.criteria().fields().size() == idxDef.indexKeyDefinitions().size()) + break; + } + } + + return idx; + } + + /** Checks that specified index matches index query criteria. */ + private boolean checkIndex(IndexDefinition idxDef, IndexQueryCriteria criteria) { + if (criteria.fields().size() > idxDef.indexKeyDefinitions().size()) + return false; + + for (int i = 0; i < criteria.fields().size(); i++) { + if (!idxDef.indexKeyDefinitions().get(i).name().equalsIgnoreCase(criteria.fields().get(i))) + return false; + } + + return true; + } + + /** */ + private Class<?> loadValClass(GridCacheContext cctx, String valClsName) throws IgniteCheckedException { + try { + ClassLoader clsLdr = U.resolveClassLoader(cctx.kernalContext().config()); + + return clsLdr.loadClass(valClsName); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("No cache serves class: " + valClsName); + } + } + + /** Runs a query and return single cursor or cursor over multiple index segments. */ + private GridCursor<IndexRow> query(GridCacheContext cctx, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + int segmentsCnt = cctx.isPartitioned() ? cctx.config().getQueryParallelism() : 1; + + if (segmentsCnt == 1) + return query(0, idx, criteria, qryCtx); + + final GridCursor<IndexRow>[] segments = new GridCursor[segmentsCnt]; + + // Actually it just traverse BPlusTree to find boundaries. It's too fast to parallelize this. + for (int i = 0; i < segmentsCnt; i++) + segments[i] = query(i, idx, criteria, qryCtx); + + return new SegmentedIndexCursor(segments, ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator()); + } + + /** Coordinate query criteria. */ + private GridCursor<IndexRow> query(int segment, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + if (criteria instanceof RangeIndexQueryCriteria) + return treeIndexRange((InlineIndex) idx, (RangeIndexQueryCriteria) criteria, segment, qryCtx); + + throw new IllegalStateException("Doesn't support index query criteria: " + criteria.getClass().getName()); + } + + /** + * Runs range query over specified segment. There are 2 steps to run query: + * 1. Traverse index by specified boundaries; + * 2. Scan over cursor and filter rows that doesn't match user criteria. + * + * Filtering is required in 2 cases: + * 1. Exclusion of one of boundaries, as idx.find() includes both of them; + * 2. To apply criteria on non-first index fields. Tree apply boundaries field by field, if first field match + * a boundary, then second field isn't checked within traversing. + */ + private GridCursor<IndexRow> treeIndexRange(InlineIndex idx, RangeIndexQueryCriteria criteria, int segment, + IndexQueryContext qryCtx) throws IgniteCheckedException { + + InlineIndexRowHandler hnd = idx.segment(0).rowHandler(); + CacheObjectContext coctx = idx.segment(0).cacheContext().cacheObjectContext(); + + IndexKey[] lowerBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + IndexKey[] upperBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + + boolean lowerAllNulls = true; + boolean upperAllNulls = true; + + List<RangeIndexQueryCriteria.RangeCriterion> treeCriteria = new ArrayList<>(); + + for (int i = 0; i < criteria.fields().size(); i++) { + String f = criteria.fields().get(i); + + IndexKeyDefinition def = hnd.indexKeyDefinitions().get(i); + + if (!def.name().equalsIgnoreCase(f)) + throw new IgniteCheckedException("Range query doesn't match index '" + idx.name() + "'"); + + RangeIndexQueryCriteria.RangeCriterion c = criteria.criteria().get(i); + + // If index is desc, then we need to swap boundaries as user declare criteria in straight manner. + // For example, there is an idx (int Val desc). It means that index stores data in reverse order (1 < 0). + // But user won't expect for criterion gt(1) to get 0 as result, instead user will use lt(1) for getting + // 0. Then we need to swap user criterion. + if (def.order().sortOrder() == DESC) + c = c.swap(); + + treeCriteria.add(c); + + IndexKey l = key(c.lower(), def, hnd.indexKeyTypeSettings(), coctx); + IndexKey u = key(c.upper(), def, hnd.indexKeyTypeSettings(), coctx); + + if (l != null) + lowerAllNulls = false; + + if (u != null) + upperAllNulls = false; + + lowerBounds[i] = l; + upperBounds[i] = u; + } + + IndexRow lower = lowerAllNulls ? null : new IndexSearchRowImpl(lowerBounds, hnd); + IndexRow upper = upperAllNulls ? null : new IndexSearchRowImpl(upperBounds, hnd); + + // Step 1. Traverse index. + GridCursor<IndexRow> findRes = idx.find(lower, upper, segment, qryCtx); + + // Step 2. Scan and filter. + return new GridCursor<IndexRow>() { + /** */ + private final IndexRowComparator rowCmp = ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator(); + + /** {@inheritDoc} */ + @Override public boolean next() throws IgniteCheckedException { + if (!findRes.next()) + return false; + + while (match(get(), lower, 1) || match(get(), upper, -1)) { + if (!findRes.next()) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public IndexRow get() throws IgniteCheckedException { + return findRes.get(); + } + + /** + * Matches index row, boundary and inclusion mask to decide whether this row will be excluded from result. + * + * @param row Result row to check. + * @param boundary Index search boundary. + * @param boundarySign {@code 1} for lower boundary and {@code -1} for upper boundary. + * @return {@code true} if specified row has to be excluded from result. + */ + private boolean match(IndexRow row, IndexRow boundary, int boundarySign) throws IgniteCheckedException { + // Unbounded search, include all. + if (boundary == null) + return false; + + int criteriaKeysCnt = treeCriteria.size(); + + for (int i = 0; i < criteriaKeysCnt; i++) { + RangeIndexQueryCriteria.RangeCriterion c = treeCriteria.get(i); + + // Include all values on this field. + if (boundary.key(i) == null) + continue; + + int cmp = rowCmp.compareKey(row, boundary, i); + + // Swap direction. + if (hnd.indexKeyDefinitions().get(i).order().sortOrder() == DESC) + cmp = -1 * cmp; + + // Exclude if field equals boundary field and criteria is excluding. + if (cmp == 0) { + if (boundarySign > 0 && !c.lowerIncl()) + return true; + + if (boundarySign < 0 && !c.upperIncl()) + return true; + } + + // Check sign. Exclude if field is out of boundaries. + if (cmp * boundarySign < 0) + return true; + } + + return false; + } + }; + } + + /** */ + private IndexKey key(Object val, IndexKeyDefinition def, IndexKeyTypeSettings settings, CacheObjectContext coctx) { + IndexKey key = null; + + if (val != null) { + if (val instanceof IndexQueryCriteriaBuilder.Null) + val = null; + + key = IndexKeyFactory.wrap( + val, def.idxType(), coctx, settings); + } + + return key; + } + + /** Single cursor over multiple segments. Next value is choose with the index row comparator. */ + private class SegmentedIndexCursor implements GridCursor<IndexRow> { + /** Cursors over segments. */ + private final GridCursor<IndexRow>[] cursors; + + /** Whether returns first value for user. */ + private boolean returnFirst; + + /** Offset of current segmented cursor to return value. */ + private int cursorOff; + + /** Comparator to compare index rows. */ + private final Comparator<GridCursor<IndexRow>> cursorComp; + + /** */ + SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, IndexRowComparator rowCmp) { + this.cursors = cursors; + + cursorComp = new Comparator<GridCursor<IndexRow>>() { + @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) { + try { + if (o1 == o2) + return 0; + + if (o1 == null) + return -1; + + if (o2 == null) + return 1; + + return rowCmp.compareKey(o1.get(), o2.get(), 0); + + } catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean next() throws IgniteCheckedException { + if (!returnFirst) { + for (int i = 0; i < cursors.length; i++) { + if (!cursors[i].next()) { + cursors[i] = null; + cursorOff++; + } + } + + if (cursorOff == cursors.length) + return false; + + Arrays.sort(cursors, cursorComp); + + returnFirst = true; + Review comment: Redundant NL ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQuerySqlIndexTest.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class IndexQuerySqlIndexTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String CACHE_TABLE = "TEST_CACHE_TABLE"; + + /** */ + private static final String TABLE = "TEST_TABLE"; + + /** */ + private static final String DESC_ID_IDX = "DESC_ID_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private IgniteCache cache; + + /** */ + private Ignite crd; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + crd = startGrids(4); + + cache = crd.cache(CACHE); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration ccfg = new CacheConfiguration<>() + .setName(CACHE); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** */ + @Test + public void testEmptyCache() { + prepareTable(); + + IgniteCache tableCache = crd.cache(CACHE_TABLE); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class, DESC_ID_IDX) + .setCriteria(lt("descId", Integer.MAX_VALUE)); + + assertTrue(tableCache.query(qry).getAll().isEmpty()); + } + + /** */ + @Test + public void testRangeQueries() { + prepareTable(); + + insertData(); + + int pivot = new Random().nextInt(CNT); + + IgniteCache tableCache = crd.cache(CACHE_TABLE); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class, DESC_ID_IDX) + .setCriteria(lt("descId", pivot)); + + check(tableCache.query(qry), 0, pivot); + + // Wrong fields in query. + GridTestUtils.assertThrowsAnyCause(null, () -> { + IndexQuery<Long, Person> wrongQry = new IndexQuery<Long, Person>(Person.class, DESC_ID_IDX) + .setCriteria(lt("id", Integer.MAX_VALUE)); + + return cache.query(wrongQry).getAll(); + + }, IgniteCheckedException.class, "No index matches index query."); + + // Wrong cache name. + GridTestUtils.assertThrowsAnyCause(null, () -> { + IndexQuery<Long, Person> wrongQry = new IndexQuery<Long, Person>(Person.class, DESC_ID_IDX) + .setCriteria(lt("descId", Integer.MAX_VALUE)); + + return cache.query(wrongQry).getAll(); + + }, IgniteCheckedException.class, "No index matches index query."); + + // Wrong schema name. + GridTestUtils.assertThrowsAnyCause(null, () -> { Review comment: Looks like all three `assertThrowsAnyCause` in this test checks the same case (the same cache, the same schema) ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/MultifieldIndexQueryTest.java ########## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.eq; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gte; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lte; + +/** */ +@RunWith(Parameterized.class) +public class MultifieldIndexQueryTest extends GridCommonAbstractTest { Review comment: Let's also add a test where only one field of the multifield index used for filtering (for both INDEX and DESC_INDEX). ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/MultiTableIndexQuery.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class MultiTableIndexQuery extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private IgniteCache cache; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignite crd = startGrids(4); + + cache = crd.cache(CACHE); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>() + .setName("TEST_CACHE") + .setIndexedTypes(Long.class, Person.class, Long.class, SecondPerson.class) + .setQueryParallelism(4); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** */ + @Test + public void testEmptyCache() { + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", Integer.MAX_VALUE)); + + QueryCursor<Cache.Entry<Long, Person>> cursor = cache.query(qry); + + assertTrue(cursor.getAll().isEmpty()); + } + + /** */ + @Test + public void testLtQuery() { + insertData(cache); + + int pivot = new Random().nextInt(CNT); + + IndexQuery<Long, SecondPerson> secQry = new IndexQuery<Long, SecondPerson>(SecondPerson.class) + .setCriteria(lt("id", CNT + pivot)); + + checkSecondPerson(cache.query(secQry), CNT, CNT + pivot); + } + + /** */ + private void insertData(IgniteCache cache) { + for (int i = 0; i < CNT; i++) { + cache.put((long) i, new Person(i)); + cache.put((long) CNT + i, new SecondPerson(CNT + i)); + } + } + + /** */ + private void checkPerson(QueryCursor<Cache.Entry<Long, Person>> cursor, int left, int right) { Review comment: There are no usages of this method. ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/MultifieldIndexQueryTest.java ########## @@ -0,0 +1,598 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.between; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.eq; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gte; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lte; + +/** */ +@RunWith(Parameterized.class) +public class MultifieldIndexQueryTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String INDEX = "TEST_IDX"; + + /** */ + private static final String DESC_INDEX = "TEST_DESC_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + @Parameterized.Parameter(0) + public int nodesCnt; + + /** */ + private Ignite ignite; + + /** */ + private IgniteCache cache; + + /** */ + @Parameterized.Parameters(name = "nodesCnt={0}") + public static Collection<Object[]> testParams() { + return Arrays.asList( + new Object[] {1}, + new Object[] {2}); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite = startGrids(nodesCnt); + + cache = ignite.cache(CACHE); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>() + .setName("TEST_CACHE") + .setIndexedTypes(Long.class, Person.class) + .setQueryParallelism(1); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** */ + @Test + public void testQueryKeyPKIndex() { + insertData(); + + int pivot = new Random().nextInt(CNT); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class, "_key_PK") + .setCriteria(lt("_KEY", (long) pivot)); + + checkPerson(cache.query(qry), 0, pivot); + } + + /** */ + @Test + public void testEmptyCacheQuery() { + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", Integer.MAX_VALUE), lt("secId", Integer.MAX_VALUE)); + + QueryCursor<Cache.Entry<Long, Person>> cursor = cache.query(qry); + + assertTrue(cursor.getAll().isEmpty()); + + // Check same query with specify index name. + qry = new IndexQuery<Long, Person>(Person.class, INDEX) + .setCriteria(lt("id", Integer.MAX_VALUE), lt("secId", Integer.MAX_VALUE)); + + assertTrue(cache.query(qry).getAll().isEmpty()); + } + + /** */ + @Test + public void testCheckBoundaries() { + cache.put(1L, new Person(0, 1)); + cache.put(2L, new Person(1, 0)); + cache.put(3L, new Person(1, 1)); + + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(between("id", 0, 1), eq("secId", 1)); + + List<Cache.Entry<Long, Person>> result = cache.query(qry).getAll(); + + assertEquals(2, result.size()); + + result.sort(Comparator.comparingLong(Cache.Entry::getKey)); + + assertEquals(1L, (long) result.get(0).getKey()); + assertEquals(3L, (long) result.get(1).getKey()); + + assertEquals(new Person(0, 1), result.get(0).getValue()); + assertEquals(new Person(1, 1), result.get(1).getValue()); + } + + /** */ + @Test + public void testLtQueryMultipleField() { + insertData(); + + int pivot = new Random().nextInt(CNT); + + // Should return empty result for ID that less any inserted. + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", -1), lt("secId", pivot)); + + assertTrue(cache.query(qry).getAll().isEmpty()); + + // Should return all data for ID and SECID that greater any inserted. Review comment: Don't understand this comment (and the following comments in this method). Why `greater` if the criteria are `lt`? ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/MultiTableIndexQuery.java ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class MultiTableIndexQuery extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private IgniteCache cache; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Ignite crd = startGrids(4); + + cache = crd.cache(CACHE); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>() + .setName("TEST_CACHE") + .setIndexedTypes(Long.class, Person.class, Long.class, SecondPerson.class) + .setQueryParallelism(4); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** */ + @Test + public void testEmptyCache() { + IndexQuery<Long, Person> qry = new IndexQuery<Long, Person>(Person.class) + .setCriteria(lt("id", Integer.MAX_VALUE)); + + QueryCursor<Cache.Entry<Long, Person>> cursor = cache.query(qry); + + assertTrue(cursor.getAll().isEmpty()); + } + + /** */ + @Test + public void testLtQuery() { Review comment: Let's also add a test with "_key_PK" index query for both tables. ########## File path: modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQuerySqlIndexTest.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.query; + +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; + +/** */ +public class IndexQuerySqlIndexTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE = "TEST_CACHE"; + + /** */ + private static final String CACHE_TABLE = "TEST_CACHE_TABLE"; + + /** */ + private static final String TABLE = "TEST_TABLE"; + + /** */ + private static final String DESC_ID_IDX = "DESC_ID_IDX"; + + /** */ + private static final int CNT = 10_000; + + /** */ + private IgniteCache cache; + + /** */ + private Ignite crd; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + crd = startGrids(4); + + cache = crd.cache(CACHE); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration ccfg = new CacheConfiguration<>() Review comment: Let's generify (also some other tests, `MultifieldIndexQueryTest` for example) ########## File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java ########## @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.query.index; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.IndexQuery; +import org.apache.ignite.cache.query.IndexQueryCriteria; +import org.apache.ignite.cache.query.IndexQueryCriteriaBuilder; +import org.apache.ignite.internal.cache.query.RangeIndexQueryCriteria; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator; +import org.apache.ignite.internal.cache.query.index.sorted.IndexSearchRowImpl; +import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler; +import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext; +import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.cache.query.index.SortOrder.DESC; + +/** + * Processor of {@link IndexQuery}. + */ +public class IndexQueryProcessor { + /** */ + private final IndexProcessor idxProc; + + /** */ + public IndexQueryProcessor(IndexProcessor idxProc) { + this.idxProc = idxProc; + } + + /** Run query on local node. */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocal( + GridCacheContext<K, V> cctx, IndexQueryDesc idxQryDesc, IndexQueryContext qryCtx, boolean keepBinary) + throws IgniteCheckedException { + + Index idx = index(cctx, idxQryDesc); + + if (idx == null) + throw new IgniteCheckedException( + "No index matches index query. Cache=" + cctx.name() + "; Qry=" + idxQryDesc); + + GridCursor<IndexRow> cursor = query(cctx, idx, idxQryDesc.criteria(), qryCtx); + + // Map IndexRow to Cache Key-Value pair. + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IndexRow currVal; + + private final CacheObjectContext coctx = cctx.cacheObjectContext(); + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (currVal != null) + return true; + + if (!cursor.next()) + return false; + + currVal = cursor.get(); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<K, V> onNext() { + if (currVal == null) + if (!hasNext()) + throw new NoSuchElementException(); + + IndexRow row = currVal; + + currVal = null; + + K k = (K) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().key(), keepBinary, false); + V v = (V) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().value(), keepBinary, false); + + return new IgniteBiTuple<>(k, v); + } + }; + } + + /** Get index to run query by specified description. */ + private Index index(GridCacheContext cctx, IndexQueryDesc idxQryDesc) throws IgniteCheckedException { + Class<?> valCls = idxQryDesc.valCls() != null ? loadValClass(cctx, idxQryDesc.valCls()) : null; + + String tableName = cctx.kernalContext().query().tableName(cctx.name(), valCls); + + if (tableName == null) + return null; + + // Find index by specified name. + if (idxQryDesc.idxName() != null) { + String name = idxQryDesc.idxName(); + + if (!QueryUtils.PRIMARY_KEY_INDEX.equals(name)) + name = name.toUpperCase(); + + String schema = cctx.kernalContext().query().schemaName(cctx); + + IndexName idxName = new IndexName(cctx.name(), schema, tableName, name); + + Index idx = idxProc.index(idxName); + + if (idx == null) + return null; + + return checkIndex(idxProc.indexDefinition(idx.id()), idxQryDesc.criteria()) ? idx : null; + } + + // Try get index by list of fields to query. + // Check all indexes by cache to find best index match: count of index fields equals to count of index criteria fields. + Collection<Index> idxs = idxProc.indexes(cctx); + + Index idx = null; + int idxFieldsCnt = 0; + + for (Index i: idxs) { + IndexDefinition idxDef = idxProc.indexDefinition(i.id()); + + if (!tableName.equals(idxDef.idxName().tableName())) + continue; + + int fldsCnt = idxDef.indexKeyDefinitions().size(); + + if (checkIndex(idxDef, idxQryDesc.criteria())) { + if (idx == null) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else if (fldsCnt < idxFieldsCnt) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else continue; + + // Best match. Index query criteria matches full index. + if (idxQryDesc.criteria().fields().size() == idxDef.indexKeyDefinitions().size()) + break; + } + } + + return idx; + } + + /** Checks that specified index matches index query criteria. */ + private boolean checkIndex(IndexDefinition idxDef, IndexQueryCriteria criteria) { + if (criteria.fields().size() > idxDef.indexKeyDefinitions().size()) + return false; + + for (int i = 0; i < criteria.fields().size(); i++) { + if (!idxDef.indexKeyDefinitions().get(i).name().equalsIgnoreCase(criteria.fields().get(i))) + return false; + } + + return true; + } + + /** */ + private Class<?> loadValClass(GridCacheContext cctx, String valClsName) throws IgniteCheckedException { + try { + ClassLoader clsLdr = U.resolveClassLoader(cctx.kernalContext().config()); + + return clsLdr.loadClass(valClsName); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("No cache serves class: " + valClsName); + } + } + + /** Runs a query and return single cursor or cursor over multiple index segments. */ + private GridCursor<IndexRow> query(GridCacheContext cctx, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + int segmentsCnt = cctx.isPartitioned() ? cctx.config().getQueryParallelism() : 1; + + if (segmentsCnt == 1) + return query(0, idx, criteria, qryCtx); + + final GridCursor<IndexRow>[] segments = new GridCursor[segmentsCnt]; + + // Actually it just traverse BPlusTree to find boundaries. It's too fast to parallelize this. + for (int i = 0; i < segmentsCnt; i++) + segments[i] = query(i, idx, criteria, qryCtx); + + return new SegmentedIndexCursor(segments, ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator()); + } + + /** Coordinate query criteria. */ + private GridCursor<IndexRow> query(int segment, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + if (criteria instanceof RangeIndexQueryCriteria) + return treeIndexRange((InlineIndex) idx, (RangeIndexQueryCriteria) criteria, segment, qryCtx); + + throw new IllegalStateException("Doesn't support index query criteria: " + criteria.getClass().getName()); + } + + /** + * Runs range query over specified segment. There are 2 steps to run query: + * 1. Traverse index by specified boundaries; + * 2. Scan over cursor and filter rows that doesn't match user criteria. + * + * Filtering is required in 2 cases: + * 1. Exclusion of one of boundaries, as idx.find() includes both of them; + * 2. To apply criteria on non-first index fields. Tree apply boundaries field by field, if first field match + * a boundary, then second field isn't checked within traversing. + */ + private GridCursor<IndexRow> treeIndexRange(InlineIndex idx, RangeIndexQueryCriteria criteria, int segment, + IndexQueryContext qryCtx) throws IgniteCheckedException { + + InlineIndexRowHandler hnd = idx.segment(0).rowHandler(); + CacheObjectContext coctx = idx.segment(0).cacheContext().cacheObjectContext(); + + IndexKey[] lowerBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + IndexKey[] upperBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + + boolean lowerAllNulls = true; + boolean upperAllNulls = true; + + List<RangeIndexQueryCriteria.RangeCriterion> treeCriteria = new ArrayList<>(); + + for (int i = 0; i < criteria.fields().size(); i++) { + String f = criteria.fields().get(i); + + IndexKeyDefinition def = hnd.indexKeyDefinitions().get(i); + + if (!def.name().equalsIgnoreCase(f)) + throw new IgniteCheckedException("Range query doesn't match index '" + idx.name() + "'"); + + RangeIndexQueryCriteria.RangeCriterion c = criteria.criteria().get(i); + + // If index is desc, then we need to swap boundaries as user declare criteria in straight manner. + // For example, there is an idx (int Val desc). It means that index stores data in reverse order (1 < 0). + // But user won't expect for criterion gt(1) to get 0 as result, instead user will use lt(1) for getting + // 0. Then we need to swap user criterion. + if (def.order().sortOrder() == DESC) + c = c.swap(); + + treeCriteria.add(c); + + IndexKey l = key(c.lower(), def, hnd.indexKeyTypeSettings(), coctx); + IndexKey u = key(c.upper(), def, hnd.indexKeyTypeSettings(), coctx); + + if (l != null) + lowerAllNulls = false; + + if (u != null) + upperAllNulls = false; + + lowerBounds[i] = l; + upperBounds[i] = u; + } + + IndexRow lower = lowerAllNulls ? null : new IndexSearchRowImpl(lowerBounds, hnd); + IndexRow upper = upperAllNulls ? null : new IndexSearchRowImpl(upperBounds, hnd); + + // Step 1. Traverse index. + GridCursor<IndexRow> findRes = idx.find(lower, upper, segment, qryCtx); + + // Step 2. Scan and filter. + return new GridCursor<IndexRow>() { + /** */ + private final IndexRowComparator rowCmp = ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator(); + + /** {@inheritDoc} */ + @Override public boolean next() throws IgniteCheckedException { + if (!findRes.next()) + return false; + + while (match(get(), lower, 1) || match(get(), upper, -1)) { + if (!findRes.next()) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public IndexRow get() throws IgniteCheckedException { + return findRes.get(); + } + + /** + * Matches index row, boundary and inclusion mask to decide whether this row will be excluded from result. + * + * @param row Result row to check. + * @param boundary Index search boundary. + * @param boundarySign {@code 1} for lower boundary and {@code -1} for upper boundary. + * @return {@code true} if specified row has to be excluded from result. + */ + private boolean match(IndexRow row, IndexRow boundary, int boundarySign) throws IgniteCheckedException { + // Unbounded search, include all. + if (boundary == null) + return false; + + int criteriaKeysCnt = treeCriteria.size(); + + for (int i = 0; i < criteriaKeysCnt; i++) { + RangeIndexQueryCriteria.RangeCriterion c = treeCriteria.get(i); + + // Include all values on this field. + if (boundary.key(i) == null) + continue; + + int cmp = rowCmp.compareKey(row, boundary, i); + + // Swap direction. + if (hnd.indexKeyDefinitions().get(i).order().sortOrder() == DESC) + cmp = -1 * cmp; + + // Exclude if field equals boundary field and criteria is excluding. + if (cmp == 0) { + if (boundarySign > 0 && !c.lowerIncl()) + return true; + + if (boundarySign < 0 && !c.upperIncl()) + return true; + } + + // Check sign. Exclude if field is out of boundaries. + if (cmp * boundarySign < 0) + return true; + } + + return false; + } + }; + } + + /** */ + private IndexKey key(Object val, IndexKeyDefinition def, IndexKeyTypeSettings settings, CacheObjectContext coctx) { + IndexKey key = null; + + if (val != null) { + if (val instanceof IndexQueryCriteriaBuilder.Null) + val = null; + + key = IndexKeyFactory.wrap( + val, def.idxType(), coctx, settings); + } + + return key; + } + + /** Single cursor over multiple segments. Next value is choose with the index row comparator. */ + private class SegmentedIndexCursor implements GridCursor<IndexRow> { + /** Cursors over segments. */ + private final GridCursor<IndexRow>[] cursors; + + /** Whether returns first value for user. */ + private boolean returnFirst; + + /** Offset of current segmented cursor to return value. */ + private int cursorOff; + + /** Comparator to compare index rows. */ + private final Comparator<GridCursor<IndexRow>> cursorComp; + + /** */ + SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, IndexRowComparator rowCmp) { + this.cursors = cursors; + + cursorComp = new Comparator<GridCursor<IndexRow>>() { + @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) { + try { + if (o1 == o2) + return 0; + + if (o1 == null) + return -1; + + if (o2 == null) + return 1; + + return rowCmp.compareKey(o1.get(), o2.get(), 0); + Review comment: Redundant NL ########## File path: modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java ########## @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cache.query.index; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.IndexQuery; +import org.apache.ignite.cache.query.IndexQueryCriteria; +import org.apache.ignite.cache.query.IndexQueryCriteriaBuilder; +import org.apache.ignite.internal.cache.query.RangeIndexQueryCriteria; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; +import org.apache.ignite.internal.cache.query.index.sorted.IndexRowComparator; +import org.apache.ignite.internal.cache.query.index.sorted.IndexSearchRowImpl; +import org.apache.ignite.internal.cache.query.index.sorted.InlineIndexRowHandler; +import org.apache.ignite.internal.cache.query.index.sorted.SortedIndexDefinition; +import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext; +import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndex; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKey; +import org.apache.ignite.internal.cache.query.index.sorted.keys.IndexKeyFactory; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.CacheObjectUtils; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.cache.query.index.SortOrder.DESC; + +/** + * Processor of {@link IndexQuery}. + */ +public class IndexQueryProcessor { + /** */ + private final IndexProcessor idxProc; + + /** */ + public IndexQueryProcessor(IndexProcessor idxProc) { + this.idxProc = idxProc; + } + + /** Run query on local node. */ + public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocal( + GridCacheContext<K, V> cctx, IndexQueryDesc idxQryDesc, IndexQueryContext qryCtx, boolean keepBinary) + throws IgniteCheckedException { + + Index idx = index(cctx, idxQryDesc); + + if (idx == null) + throw new IgniteCheckedException( + "No index matches index query. Cache=" + cctx.name() + "; Qry=" + idxQryDesc); + + GridCursor<IndexRow> cursor = query(cctx, idx, idxQryDesc.criteria(), qryCtx); + + // Map IndexRow to Cache Key-Value pair. + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IndexRow currVal; + + private final CacheObjectContext coctx = cctx.cacheObjectContext(); + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (currVal != null) + return true; + + if (!cursor.next()) + return false; + + currVal = cursor.get(); + + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<K, V> onNext() { + if (currVal == null) + if (!hasNext()) + throw new NoSuchElementException(); + + IndexRow row = currVal; + + currVal = null; + + K k = (K) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().key(), keepBinary, false); + V v = (V) CacheObjectUtils.unwrapBinaryIfNeeded(coctx, row.cacheDataRow().value(), keepBinary, false); + + return new IgniteBiTuple<>(k, v); + } + }; + } + + /** Get index to run query by specified description. */ + private Index index(GridCacheContext cctx, IndexQueryDesc idxQryDesc) throws IgniteCheckedException { + Class<?> valCls = idxQryDesc.valCls() != null ? loadValClass(cctx, idxQryDesc.valCls()) : null; + + String tableName = cctx.kernalContext().query().tableName(cctx.name(), valCls); + + if (tableName == null) + return null; + + // Find index by specified name. + if (idxQryDesc.idxName() != null) { + String name = idxQryDesc.idxName(); + + if (!QueryUtils.PRIMARY_KEY_INDEX.equals(name)) + name = name.toUpperCase(); + + String schema = cctx.kernalContext().query().schemaName(cctx); + + IndexName idxName = new IndexName(cctx.name(), schema, tableName, name); + + Index idx = idxProc.index(idxName); + + if (idx == null) + return null; + + return checkIndex(idxProc.indexDefinition(idx.id()), idxQryDesc.criteria()) ? idx : null; + } + + // Try get index by list of fields to query. + // Check all indexes by cache to find best index match: count of index fields equals to count of index criteria fields. + Collection<Index> idxs = idxProc.indexes(cctx); + + Index idx = null; + int idxFieldsCnt = 0; + + for (Index i: idxs) { + IndexDefinition idxDef = idxProc.indexDefinition(i.id()); + + if (!tableName.equals(idxDef.idxName().tableName())) + continue; + + int fldsCnt = idxDef.indexKeyDefinitions().size(); + + if (checkIndex(idxDef, idxQryDesc.criteria())) { + if (idx == null) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else if (fldsCnt < idxFieldsCnt) { + idx = i; + idxFieldsCnt = fldsCnt; + } + else continue; + + // Best match. Index query criteria matches full index. + if (idxQryDesc.criteria().fields().size() == idxDef.indexKeyDefinitions().size()) + break; + } + } + + return idx; + } + + /** Checks that specified index matches index query criteria. */ + private boolean checkIndex(IndexDefinition idxDef, IndexQueryCriteria criteria) { + if (criteria.fields().size() > idxDef.indexKeyDefinitions().size()) + return false; + + for (int i = 0; i < criteria.fields().size(); i++) { + if (!idxDef.indexKeyDefinitions().get(i).name().equalsIgnoreCase(criteria.fields().get(i))) + return false; + } + + return true; + } + + /** */ + private Class<?> loadValClass(GridCacheContext cctx, String valClsName) throws IgniteCheckedException { + try { + ClassLoader clsLdr = U.resolveClassLoader(cctx.kernalContext().config()); + + return clsLdr.loadClass(valClsName); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException("No cache serves class: " + valClsName); + } + } + + /** Runs a query and return single cursor or cursor over multiple index segments. */ + private GridCursor<IndexRow> query(GridCacheContext cctx, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + int segmentsCnt = cctx.isPartitioned() ? cctx.config().getQueryParallelism() : 1; + + if (segmentsCnt == 1) + return query(0, idx, criteria, qryCtx); + + final GridCursor<IndexRow>[] segments = new GridCursor[segmentsCnt]; + + // Actually it just traverse BPlusTree to find boundaries. It's too fast to parallelize this. + for (int i = 0; i < segmentsCnt; i++) + segments[i] = query(i, idx, criteria, qryCtx); + + return new SegmentedIndexCursor(segments, ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator()); + } + + /** Coordinate query criteria. */ + private GridCursor<IndexRow> query(int segment, Index idx, IndexQueryCriteria criteria, IndexQueryContext qryCtx) + throws IgniteCheckedException { + + if (criteria instanceof RangeIndexQueryCriteria) + return treeIndexRange((InlineIndex) idx, (RangeIndexQueryCriteria) criteria, segment, qryCtx); + + throw new IllegalStateException("Doesn't support index query criteria: " + criteria.getClass().getName()); + } + + /** + * Runs range query over specified segment. There are 2 steps to run query: + * 1. Traverse index by specified boundaries; + * 2. Scan over cursor and filter rows that doesn't match user criteria. + * + * Filtering is required in 2 cases: + * 1. Exclusion of one of boundaries, as idx.find() includes both of them; + * 2. To apply criteria on non-first index fields. Tree apply boundaries field by field, if first field match + * a boundary, then second field isn't checked within traversing. + */ + private GridCursor<IndexRow> treeIndexRange(InlineIndex idx, RangeIndexQueryCriteria criteria, int segment, + IndexQueryContext qryCtx) throws IgniteCheckedException { + + InlineIndexRowHandler hnd = idx.segment(0).rowHandler(); + CacheObjectContext coctx = idx.segment(0).cacheContext().cacheObjectContext(); + + IndexKey[] lowerBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + IndexKey[] upperBounds = new IndexKey[hnd.indexKeyDefinitions().size()]; + + boolean lowerAllNulls = true; + boolean upperAllNulls = true; + + List<RangeIndexQueryCriteria.RangeCriterion> treeCriteria = new ArrayList<>(); + + for (int i = 0; i < criteria.fields().size(); i++) { + String f = criteria.fields().get(i); + + IndexKeyDefinition def = hnd.indexKeyDefinitions().get(i); + + if (!def.name().equalsIgnoreCase(f)) + throw new IgniteCheckedException("Range query doesn't match index '" + idx.name() + "'"); + + RangeIndexQueryCriteria.RangeCriterion c = criteria.criteria().get(i); + + // If index is desc, then we need to swap boundaries as user declare criteria in straight manner. + // For example, there is an idx (int Val desc). It means that index stores data in reverse order (1 < 0). + // But user won't expect for criterion gt(1) to get 0 as result, instead user will use lt(1) for getting + // 0. Then we need to swap user criterion. + if (def.order().sortOrder() == DESC) + c = c.swap(); + + treeCriteria.add(c); + + IndexKey l = key(c.lower(), def, hnd.indexKeyTypeSettings(), coctx); + IndexKey u = key(c.upper(), def, hnd.indexKeyTypeSettings(), coctx); + + if (l != null) + lowerAllNulls = false; + + if (u != null) + upperAllNulls = false; + + lowerBounds[i] = l; + upperBounds[i] = u; + } + + IndexRow lower = lowerAllNulls ? null : new IndexSearchRowImpl(lowerBounds, hnd); + IndexRow upper = upperAllNulls ? null : new IndexSearchRowImpl(upperBounds, hnd); + + // Step 1. Traverse index. + GridCursor<IndexRow> findRes = idx.find(lower, upper, segment, qryCtx); + + // Step 2. Scan and filter. + return new GridCursor<IndexRow>() { + /** */ + private final IndexRowComparator rowCmp = ((SortedIndexDefinition) idxProc.indexDefinition(idx.id())).rowComparator(); + + /** {@inheritDoc} */ + @Override public boolean next() throws IgniteCheckedException { + if (!findRes.next()) + return false; + + while (match(get(), lower, 1) || match(get(), upper, -1)) { + if (!findRes.next()) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public IndexRow get() throws IgniteCheckedException { + return findRes.get(); + } + + /** + * Matches index row, boundary and inclusion mask to decide whether this row will be excluded from result. + * + * @param row Result row to check. + * @param boundary Index search boundary. + * @param boundarySign {@code 1} for lower boundary and {@code -1} for upper boundary. + * @return {@code true} if specified row has to be excluded from result. + */ + private boolean match(IndexRow row, IndexRow boundary, int boundarySign) throws IgniteCheckedException { + // Unbounded search, include all. + if (boundary == null) + return false; + + int criteriaKeysCnt = treeCriteria.size(); + + for (int i = 0; i < criteriaKeysCnt; i++) { + RangeIndexQueryCriteria.RangeCriterion c = treeCriteria.get(i); + + // Include all values on this field. + if (boundary.key(i) == null) + continue; + + int cmp = rowCmp.compareKey(row, boundary, i); + + // Swap direction. + if (hnd.indexKeyDefinitions().get(i).order().sortOrder() == DESC) + cmp = -1 * cmp; + + // Exclude if field equals boundary field and criteria is excluding. + if (cmp == 0) { + if (boundarySign > 0 && !c.lowerIncl()) + return true; + + if (boundarySign < 0 && !c.upperIncl()) + return true; + } + + // Check sign. Exclude if field is out of boundaries. + if (cmp * boundarySign < 0) + return true; + } + + return false; + } + }; + } + + /** */ + private IndexKey key(Object val, IndexKeyDefinition def, IndexKeyTypeSettings settings, CacheObjectContext coctx) { + IndexKey key = null; + + if (val != null) { + if (val instanceof IndexQueryCriteriaBuilder.Null) + val = null; + + key = IndexKeyFactory.wrap( + val, def.idxType(), coctx, settings); + } + + return key; + } + + /** Single cursor over multiple segments. Next value is choose with the index row comparator. */ + private class SegmentedIndexCursor implements GridCursor<IndexRow> { + /** Cursors over segments. */ + private final GridCursor<IndexRow>[] cursors; + + /** Whether returns first value for user. */ + private boolean returnFirst; + + /** Offset of current segmented cursor to return value. */ + private int cursorOff; + + /** Comparator to compare index rows. */ + private final Comparator<GridCursor<IndexRow>> cursorComp; + + /** */ + SegmentedIndexCursor(GridCursor<IndexRow>[] cursors, IndexRowComparator rowCmp) { + this.cursors = cursors; + + cursorComp = new Comparator<GridCursor<IndexRow>>() { + @Override public int compare(GridCursor<IndexRow> o1, GridCursor<IndexRow> o2) { + try { + if (o1 == o2) + return 0; + + if (o1 == null) + return -1; + + if (o2 == null) + return 1; + + return rowCmp.compareKey(o1.get(), o2.get(), 0); + + } catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }; + } + + /** {@inheritDoc} */ + @Override public boolean next() throws IgniteCheckedException { + if (!returnFirst) { + for (int i = 0; i < cursors.length; i++) { + if (!cursors[i].next()) { + cursors[i] = null; + cursorOff++; + } + } + + if (cursorOff == cursors.length) + return false; + + Arrays.sort(cursors, cursorComp); + + returnFirst = true; + + } else { Review comment: NL after `{` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
