Repository: ignite Updated Branches: refs/heads/master c141ded25 -> 870ecf89d
IGNITE-5982: GridMapQueryExecutor was split into several pieces. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e28d0d6c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e28d0d6c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e28d0d6c Branch: refs/heads/master Commit: e28d0d6cc617a4b0c7b0e4c4a5197b69f0c3e4bc Parents: 9da6938 Author: devozerov <voze...@gridgain.com> Authored: Tue Aug 8 15:16:58 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Tue Aug 8 15:16:58 2017 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 501 ++----------------- .../query/h2/twostep/MapNodeResults.java | 108 ++++ .../query/h2/twostep/MapQueryResult.java | 258 ++++++++++ .../query/h2/twostep/MapQueryResults.java | 155 ++++++ .../h2/twostep/MapReplicatedReservation.java | 38 ++ .../query/h2/twostep/MapRequestKey.java | 65 +++ .../query/h2/twostep/MapReservationKey.java | 73 +++ 7 files changed, 730 insertions(+), 468 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index fcf5f10..19b628b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import java.lang.reflect.Field; import java.sql.Connection; import java.sql.ResultSet; import java.util.AbstractCollection; @@ -31,7 +30,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReferenceArray; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -39,7 +37,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; -import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; @@ -57,35 +54,29 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.QueryTable; -import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode; import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest; -import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.jdbc.JdbcResultSet; -import org.h2.result.ResultInterface; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; -import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; @@ -94,30 +85,13 @@ import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoin import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages; -import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; /** * Map query executor. */ +@SuppressWarnings("ForLoopReplaceableByForEach") public class GridMapQueryExecutor { /** */ - private static final Field RESULT_FIELD; - - /* - * Initialize. - */ - static { - try { - RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result"); - - RESULT_FIELD.setAccessible(true); - } - catch (NoSuchFieldException e) { - throw new IllegalStateException("Check H2 version in classpath.", e); - } - } - - /** */ private IgniteLogger log; /** */ @@ -127,14 +101,13 @@ public class GridMapQueryExecutor { private IgniteH2Indexing h2; /** */ - private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>(); + private ConcurrentMap<UUID, MapNodeResults> qryRess = new ConcurrentHashMap8<>(); /** */ private final GridSpinBusyLock busyLock; /** */ - private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations = - new ConcurrentHashMap8<>(); + private final ConcurrentMap<MapReservationKey, GridReservable> reservations = new ConcurrentHashMap8<>(); /** * @param busyLock Busy lock. @@ -162,7 +135,7 @@ public class GridMapQueryExecutor { GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId); - NodeResults nodeRess = qryRess.remove(nodeId); + MapNodeResults nodeRess = qryRess.remove(nodeId); if (nodeRess == null) return; @@ -172,6 +145,7 @@ public class GridMapQueryExecutor { }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { + @SuppressWarnings("deprecation") @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!busyLock.enterBusy()) return; @@ -228,7 +202,7 @@ public class GridMapQueryExecutor { private void onCancel(ClusterNode node, GridQueryCancelRequest msg) { long qryReqId = msg.queryRequestId(); - NodeResults nodeRess = resultsForNode(node.id()); + MapNodeResults nodeRess = resultsForNode(node.id()); boolean clear = GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP); @@ -245,13 +219,13 @@ public class GridMapQueryExecutor { * @param nodeId Node ID. * @return Results for node. */ - private NodeResults resultsForNode(UUID nodeId) { - NodeResults nodeRess = qryRess.get(nodeId); + private MapNodeResults resultsForNode(UUID nodeId) { + MapNodeResults nodeRess = qryRess.get(nodeId); if (nodeRess == null) { - nodeRess = new NodeResults(); + nodeRess = new MapNodeResults(); - NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess); + MapNodeResults old = qryRess.putIfAbsent(nodeId, nodeRess); if (old != null) nodeRess = old; @@ -300,13 +274,12 @@ public class GridMapQueryExecutor { continue; // For replicated cache topology version does not make sense. - final T2<String,AffinityTopologyVersion> grpKey = - new T2<>(cctx.name(), cctx.isReplicated() ? null : topVer); + final MapReservationKey grpKey = new MapReservationKey(cctx.name(), cctx.isReplicated() ? null : topVer); GridReservable r = reservations.get(grpKey); if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits. - if (r != ReplicatedReservation.INSTANCE) { + if (r != MapReplicatedReservation.INSTANCE) { if (!r.reserve()) return false; // We need explicit partitions here -> retry. @@ -327,7 +300,7 @@ public class GridMapQueryExecutor { } // Mark that we checked this replicated cache. - reservations.putIfAbsent(grpKey, ReplicatedReservation.INSTANCE); + reservations.putIfAbsent(grpKey, MapReplicatedReservation.INSTANCE); } } else { // Reserve primary partitions for partitioned cache (if no explicit given). @@ -381,6 +354,7 @@ public class GridMapQueryExecutor { return Collections.emptySet(); return new AbstractCollection<Integer>() { + @SuppressWarnings("NullableProblems") @Override public Iterator<Integer> iterator() { return new Iterator<Integer>() { /** */ @@ -537,9 +511,9 @@ public class GridMapQueryExecutor { GridCacheContext<?, ?> mainCctx = !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null; - NodeResults nodeRess = resultsForNode(node.id()); + MapNodeResults nodeRess = resultsForNode(node.id()); - QueryResults qr = null; + MapQueryResults qr = null; List<GridReservable> reserved = new ArrayList<>(); @@ -553,7 +527,7 @@ public class GridMapQueryExecutor { } } - qr = new QueryResults(reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null); + qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null); if (nodeRess.put(reqId, segmentId, qr) != null) throw new IllegalStateException(); @@ -619,7 +593,7 @@ public class GridMapQueryExecutor { rs = h2.executeSqlQueryWithTimer(conn, qry.query(), F.asList(qry.parameters(params)), true, timeout, - qr.cancels[qryIdx]); + qr.queryCancel(qryIdx)); if (evt) { assert mainCctx != null; @@ -644,7 +618,7 @@ public class GridMapQueryExecutor { qr.addResult(qryIdx, qry, node.id(), rs, params); - if (qr.canceled) { + if (qr.cancelled()) { qr.result(qryIdx).close(); throw new QueryCancelledException(); @@ -724,7 +698,7 @@ public class GridMapQueryExecutor { * @param req Request. */ private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) { - NodeResults nodeRess = qryRess.get(node.id()); + MapNodeResults nodeRess = qryRess.get(node.id()); if (nodeRess == null) { sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req)); @@ -736,11 +710,11 @@ public class GridMapQueryExecutor { return; } - QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); + MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId()); if (qr == null) sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req)); - else if (qr.canceled) + else if (qr.cancelled()) sendError(node, req.queryRequestId(), new QueryCancelledException()); else sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize()); @@ -754,16 +728,16 @@ public class GridMapQueryExecutor { * @param segmentId Index segment ID. * @param pageSize Page size. */ - private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId, + private void sendNextPage(MapNodeResults nodeRess, ClusterNode node, MapQueryResults qr, int qry, int segmentId, int pageSize) { - QueryResult res = qr.result(qry); + MapQueryResult res = qr.result(qry); assert res != null; - if (res.closed) + if (res.closed()) return; - int page = res.page; + int page = res.page(); List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize)); @@ -773,16 +747,16 @@ public class GridMapQueryExecutor { res.close(); if (qr.isAllClosed()) - nodeRess.remove(qr.qryReqId, segmentId, qr); + nodeRess.remove(qr.queryRequestId(), segmentId, qr); } try { boolean loc = node.isLocal(); - GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page, - page == 0 ? res.rowCnt : -1, - res.cols, - loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)), + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.queryRequestId(), segmentId, qry, page, + page == 0 ? res.rowCount() : -1, + res.columnCount(), + loc ? null : toMessages(rows, new ArrayList<Message>(res.columnCount())), loc ? rows : null); if (loc) @@ -828,418 +802,9 @@ public class GridMapQueryExecutor { */ public void onCacheStop(String cacheName) { // Drop group reservations. - for (T2<String,AffinityTopologyVersion> grpKey : reservations.keySet()) { - if (F.eq(grpKey.get1(), cacheName)) + for (MapReservationKey grpKey : reservations.keySet()) { + if (F.eq(grpKey.cacheName(), cacheName)) reservations.remove(grpKey); } } - - - /** - * - */ - private static class NodeResults { - /** */ - private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>(); - - /** */ - private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = - new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); - - /** - * @param reqId Query Request ID. - * @return {@code False} if query was already cancelled. - */ - boolean cancelled(long reqId) { - return qryHist.get(reqId) != null; - } - - /** - * @param reqId Query Request ID. - * @return {@code True} if cancelled. - */ - boolean onCancel(long reqId) { - Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE); - - return old == null; - } - - /** - * @param reqId Query Request ID. - * @param segmentId Index segment ID. - * @return query partial results. - */ - public QueryResults get(long reqId, int segmentId) { - return res.get(new RequestKey(reqId, segmentId)); - } - - /** - * Cancel all thread of given request. - * @param reqID Request ID. - */ - public void cancelRequest(long reqID) { - for (RequestKey key : res.keySet()) { - if (key.reqId == reqID) { - QueryResults removed = res.remove(key); - - if (removed != null) - removed.cancel(true); - } - - } - } - - /** - * @param reqId Query Request ID. - * @param segmentId Index segment ID. - * @param qr Query Results. - * @return {@code True} if removed. - */ - public boolean remove(long reqId, int segmentId, QueryResults qr) { - return res.remove(new RequestKey(reqId, segmentId), qr); - } - - /** - * @param reqId Query Request ID. - * @param segmentId Index segment ID. - * @param qr Query Results. - * @return previous value. - */ - public QueryResults put(long reqId, int segmentId, QueryResults qr) { - return res.put(new RequestKey(reqId, segmentId), qr); - } - - /** - * Cancel all node queries. - */ - public void cancelAll() { - for (QueryResults ress : res.values()) - ress.cancel(true); - } - - /** - * - */ - private static class RequestKey { - /** */ - private long reqId; - - /** */ - private int segmentId; - - /** Constructor */ - RequestKey(long reqId, int segmentId) { - this.reqId = reqId; - this.segmentId = segmentId; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - RequestKey other = (RequestKey)o; - - return reqId == other.reqId && segmentId == other.segmentId; - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = (int)(reqId ^ (reqId >>> 32)); - result = 31 * result + segmentId; - return result; - } - } - } - - /** - * - */ - private class QueryResults { - /** */ - private final long qryReqId; - - /** */ - private final AtomicReferenceArray<QueryResult> results; - - /** */ - private final GridQueryCancel[] cancels; - - /** */ - private final String cacheName; - - /** */ - private volatile boolean canceled; - - /** - * @param qryReqId Query request ID. - * @param qrys Number of queries. - * @param cacheName Cache name. - */ - @SuppressWarnings("unchecked") - private QueryResults(long qryReqId, int qrys, @Nullable String cacheName) { - this.qryReqId = qryReqId; - this.cacheName = cacheName; - - results = new AtomicReferenceArray<>(qrys); - cancels = new GridQueryCancel[qrys]; - - for (int i = 0; i < cancels.length; i++) - cancels[i] = new GridQueryCancel(); - } - - /** - * @param qry Query result index. - * @return Query result. - */ - QueryResult result(int qry) { - return results.get(qry); - } - - /** - * @param qry Query result index. - * @param q Query object. - * @param qrySrcNodeId Query source node. - * @param rs Result set. - */ - void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { - if (!results.compareAndSet(qry, null, new QueryResult(rs, ctx, cacheName, qrySrcNodeId, q, params))) - throw new IllegalStateException(); - } - - /** - * @return {@code true} If all results are closed. - */ - boolean isAllClosed() { - for (int i = 0; i < results.length(); i++) { - QueryResult res = results.get(i); - - if (res == null || !res.closed) - return false; - } - - return true; - } - - /** - * Cancels the query. - */ - void cancel(boolean forceQryCancel) { - if (canceled) - return; - - canceled = true; - - for (int i = 0; i < results.length(); i++) { - QueryResult res = results.get(i); - - if (res != null) { - res.close(); - - continue; - } - - if (forceQryCancel) { - GridQueryCancel cancel = cancels[i]; - - if (cancel != null) - cancel.cancel(); - } - } - } - } - - /** - * Result for a single part of the query. - */ - private class QueryResult implements AutoCloseable { - /** */ - private final ResultInterface res; - - /** */ - private final ResultSet rs; - - /** Kernal context. */ - private final GridKernalContext ctx; - - /** */ - private final String cacheName; - - /** */ - private final GridCacheSqlQuery qry; - - /** */ - private final UUID qrySrcNodeId; - - /** */ - private final int cols; - - /** */ - private int page; - - /** */ - private final int rowCnt; - - /** */ - private boolean cpNeeded; - - /** */ - private volatile boolean closed; - - /** */ - private final Object[] params; - - /** - * @param rs Result set. - * @param ctx Kernal context. - * @param cacheName Cache name. - * @param qrySrcNodeId Query source node. - * @param qry Query. - * @param params Query params. - */ - private QueryResult(ResultSet rs, GridKernalContext ctx, @Nullable String cacheName, - UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) { - this.ctx = ctx; - this.cacheName = cacheName; - this.qry = qry; - this.params = params; - this.qrySrcNodeId = qrySrcNodeId; - this.cpNeeded = F.eq(ctx.localNodeId(), qrySrcNodeId); - - if (rs != null) { - this.rs = rs; - try { - res = (ResultInterface)RESULT_FIELD.get(rs); - } - catch (IllegalAccessException e) { - throw new IllegalStateException(e); // Must not happen. - } - - rowCnt = res.getRowCount(); - cols = res.getVisibleColumnCount(); - } - else { - this.rs = null; - this.res = null; - this.cols = -1; - this.rowCnt = -1; - - closed = true; - } - } - - /** - * @param rows Collection to fetch into. - * @param pageSize Page size. - * @return {@code true} If there are no more rows available. - */ - synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) { - if (closed) - return true; - - boolean readEvt = cacheName != null && ctx.event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - - page++; - - for (int i = 0 ; i < pageSize; i++) { - if (!res.next()) - return true; - - Value[] row = res.currentRow(); - - if (cpNeeded) { - boolean copied = false; - - for (int j = 0; j < row.length; j++) { - Value val = row[j]; - - if (val instanceof GridH2ValueCacheObject) { - GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val; - - row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) { - @Override public Object getObject() { - return getObject(true); - } - }; - - copied = true; - } - } - - if (i == 0 && !copied) - cpNeeded = false; // No copy on read caches, skip next checks. - } - - assert row != null; - - if (readEvt) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "SQL fields query result set row read.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.SQL.name(), - cacheName, - null, - qry.query(), - null, - null, - params, - qrySrcNodeId, - null, - null, - null, - null, - row(row))); - } - - rows.add(res.currentRow()); - } - - return false; - } - - /** - * @param row Values array row. - * @return Objects list row. - */ - private List<?> row(Value[] row) { - List<Object> res = new ArrayList<>(row.length); - - for (Value v : row) - res.add(v.getObject()); - - return res; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() { - if (closed) - return; - - closed = true; - - U.close(rs, log); - } - } - - /** - * Fake reservation object for replicated caches. - */ - private static class ReplicatedReservation implements GridReservable { - /** */ - static final ReplicatedReservation INSTANCE = new ReplicatedReservation(); - - /** {@inheritDoc} */ - @Override public boolean reserve() { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public void release() { - throw new IllegalStateException(); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java new file mode 100644 index 0000000..d5ea357 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; +import org.jsr166.ConcurrentHashMap8; + +import java.util.concurrent.ConcurrentMap; + +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; + +/** + * Mapper node results. + */ +class MapNodeResults { + /** */ + private final ConcurrentMap<MapRequestKey, MapQueryResults> res = new ConcurrentHashMap8<>(); + + /** */ + private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist = + new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q); + + /** + * @param reqId Query Request ID. + * @return {@code False} if query was already cancelled. + */ + boolean cancelled(long reqId) { + return qryHist.get(reqId) != null; + } + + /** + * @param reqId Query Request ID. + * @return {@code True} if cancelled. + */ + boolean onCancel(long reqId) { + Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE); + + return old == null; + } + + /** + * @param reqId Query Request ID. + * @param segmentId Index segment ID. + * @return query partial results. + */ + public MapQueryResults get(long reqId, int segmentId) { + return res.get(new MapRequestKey(reqId, segmentId)); + } + + /** + * Cancel all thread of given request. + * @param reqId Request ID. + */ + public void cancelRequest(long reqId) { + for (MapRequestKey key : res.keySet()) { + if (key.requestId() == reqId) { + MapQueryResults removed = res.remove(key); + + if (removed != null) + removed.cancel(true); + } + } + } + + /** + * @param reqId Query Request ID. + * @param segmentId Index segment ID. + * @param qr Query Results. + * @return {@code True} if removed. + */ + public boolean remove(long reqId, int segmentId, MapQueryResults qr) { + return res.remove(new MapRequestKey(reqId, segmentId), qr); + } + + /** + * @param reqId Query Request ID. + * @param segmentId Index segment ID. + * @param qr Query Results. + * @return previous value. + */ + public MapQueryResults put(long reqId, int segmentId, MapQueryResults qr) { + return res.put(new MapRequestKey(reqId, segmentId), qr); + } + + /** + * Cancel all node queries. + */ + public void cancelAll() { + for (MapQueryResults ress : res.values()) + ress.cancel(true); + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java new file mode 100644 index 0000000..4799e03 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.events.CacheQueryReadEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.h2.jdbc.JdbcResultSet; +import org.h2.result.ResultInterface; +import org.h2.value.Value; +import org.jetbrains.annotations.Nullable; + +import java.lang.reflect.Field; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; + +/** + * Mapper result for a single part of the query. + */ +class MapQueryResult implements AutoCloseable { + /** */ + private static final Field RESULT_FIELD; + + /* + * Initialize. + */ + static { + try { + RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result"); + + RESULT_FIELD.setAccessible(true); + } + catch (NoSuchFieldException e) { + throw new IllegalStateException("Check H2 version in classpath.", e); + } + } + + /** Indexing. */ + private final IgniteH2Indexing h2; + + /** */ + private final ResultInterface res; + + /** */ + private final ResultSet rs; + + /** */ + private final String cacheName; + + /** */ + private final GridCacheSqlQuery qry; + + /** */ + private final UUID qrySrcNodeId; + + /** */ + private final int cols; + + /** */ + private int page; + + /** */ + private final int rowCnt; + + /** */ + private boolean cpNeeded; + + /** */ + private volatile boolean closed; + + /** */ + private final Object[] params; + + /** + * @param rs Result set. + * @param cacheName Cache name. + * @param qrySrcNodeId Query source node. + * @param qry Query. + * @param params Query params. + */ + MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName, + UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) { + this.h2 = h2; + this.cacheName = cacheName; + this.qry = qry; + this.params = params; + this.qrySrcNodeId = qrySrcNodeId; + this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId); + + if (rs != null) { + this.rs = rs; + try { + res = (ResultInterface)RESULT_FIELD.get(rs); + } + catch (IllegalAccessException e) { + throw new IllegalStateException(e); // Must not happen. + } + + rowCnt = res.getRowCount(); + cols = res.getVisibleColumnCount(); + } + else { + this.rs = null; + this.res = null; + this.cols = -1; + this.rowCnt = -1; + + closed = true; + } + } + + /** + * @return Page number. + */ + int page() { + return page; + } + + /** + * @return Row count. + */ + int rowCount() { + return rowCnt; + } + + /** + * @return Column ocunt. + */ + int columnCount() { + return cols; + } + + /** + * @return Closed flag. + */ + boolean closed() { + return closed; + } + + /** + * @param rows Collection to fetch into. + * @param pageSize Page size. + * @return {@code true} If there are no more rows available. + */ + synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) { + if (closed) + return true; + + boolean readEvt = cacheName != null && h2.kernalContext().event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + + page++; + + for (int i = 0 ; i < pageSize; i++) { + if (!res.next()) + return true; + + Value[] row = res.currentRow(); + + if (cpNeeded) { + boolean copied = false; + + for (int j = 0; j < row.length; j++) { + Value val = row[j]; + + if (val instanceof GridH2ValueCacheObject) { + GridH2ValueCacheObject valCacheObj = (GridH2ValueCacheObject)val; + + row[j] = new GridH2ValueCacheObject(valCacheObj.getCacheObject(), h2.objectContext()) { + @Override public Object getObject() { + return getObject(true); + } + }; + + copied = true; + } + } + + if (i == 0 && !copied) + cpNeeded = false; // No copy on read caches, skip next checks. + } + + assert row != null; + + if (readEvt) { + GridKernalContext ctx = h2.kernalContext(); + + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "SQL fields query result set row read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SQL.name(), + cacheName, + null, + qry.query(), + null, + null, + params, + qrySrcNodeId, + null, + null, + null, + null, + row(row))); + } + + rows.add(res.currentRow()); + } + + return false; + } + + /** + * @param row Values array row. + * @return Objects list row. + */ + private List<?> row(Value[] row) { + List<Object> res = new ArrayList<>(row.length); + + for (Value v : row) + res.add(v.getObject()); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized void close() { + if (closed) + return; + + closed = true; + + U.closeQuiet(rs); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java new file mode 100644 index 0000000..7ad1d14 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.query.GridQueryCancel; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.jetbrains.annotations.Nullable; + +import java.sql.ResultSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReferenceArray; + +/** + * Mapper query results. + */ +class MapQueryResults { + /** H@ indexing. */ + private final IgniteH2Indexing h2; + + /** */ + private final long qryReqId; + + /** */ + private final AtomicReferenceArray<MapQueryResult> results; + + /** */ + private final GridQueryCancel[] cancels; + + /** */ + private final String cacheName; + + /** */ + private volatile boolean cancelled; + + /** + * @param qryReqId Query request ID. + * @param qrys Number of queries. + * @param cacheName Cache name. + */ + @SuppressWarnings("unchecked") + MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, + @Nullable String cacheName) { + this.h2 = h2; + this.qryReqId = qryReqId; + this.cacheName = cacheName; + + results = new AtomicReferenceArray<>(qrys); + cancels = new GridQueryCancel[qrys]; + + for (int i = 0; i < cancels.length; i++) + cancels[i] = new GridQueryCancel(); + } + + /** + * @param qry Query result index. + * @return Query result. + */ + MapQueryResult result(int qry) { + return results.get(qry); + } + + /** + * Get cancel token for query. + * + * @param qryIdx Query index. + * @return Cancel token. + */ + GridQueryCancel queryCancel(int qryIdx) { + return cancels[qryIdx]; + } + + /** + * @param qry Query result index. + * @param q Query object. + * @param qrySrcNodeId Query source node. + * @param rs Result set. + */ + void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) { + MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params); + + if (!results.compareAndSet(qry, null, res)) + throw new IllegalStateException(); + } + + /** + * @return {@code true} If all results are closed. + */ + boolean isAllClosed() { + for (int i = 0; i < results.length(); i++) { + MapQueryResult res = results.get(i); + + if (res == null || !res.closed()) + return false; + } + + return true; + } + + /** + * Cancels the query. + */ + void cancel(boolean forceQryCancel) { + if (cancelled) + return; + + cancelled = true; + + for (int i = 0; i < results.length(); i++) { + MapQueryResult res = results.get(i); + + if (res != null) { + res.close(); + + continue; + } + + if (forceQryCancel) { + GridQueryCancel cancel = cancels[i]; + + if (cancel != null) + cancel.cancel(); + } + } + } + + /** + * @return Cancel flag. + */ + boolean cancelled() { + return cancelled; + } + + /** + * @return Query request ID. + */ + long queryRequestId() { + return qryReqId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java new file mode 100644 index 0000000..dd8237b --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReplicatedReservation.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; + +/** + * Mapper fake reservation object for replicated caches. + */ +class MapReplicatedReservation implements GridReservable { + /** */ + static final MapReplicatedReservation INSTANCE = new MapReplicatedReservation(); + + /** {@inheritDoc} */ + @Override public boolean reserve() { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public void release() { + throw new IllegalStateException(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java new file mode 100644 index 0000000..6feb8ea --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +/** + * Mapper request key. + */ +class MapRequestKey { + /** */ + private long reqId; + + /** */ + private int segmentId; + + /** Constructor */ + MapRequestKey(long reqId, int segmentId) { + this.reqId = reqId; + this.segmentId = segmentId; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MapRequestKey other = (MapRequestKey)o; + + return reqId == other.reqId && segmentId == other.segmentId; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (int)(reqId ^ (reqId >>> 32)); + + res = 31 * res + segmentId; + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e28d0d6c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java new file mode 100644 index 0000000..9d2d7ba --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapReservationKey.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Mapper reservation key. + */ +public class MapReservationKey { + /** Cache name. */ + private final String cacheName; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topVer Topology version. + */ + public MapReservationKey(String cacheName, AffinityTopologyVersion topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MapReservationKey other = (MapReservationKey)o; + + return F.eq(cacheName, other.cacheName) && F.eq(topVer, other.topVer); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = cacheName != null ? cacheName.hashCode() : 0; + + res = 31 * res + (topVer != null ? topVer.hashCode() : 0); + + return res; + } +}