http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 8facd32..7dda67f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -33,10 +33,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import javax.cache.configuration.Factory; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.BulkLoadContextCursor; import org.apache.ignite.cache.query.FieldsQueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteVersionUtils; @@ -56,9 +56,11 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.SqlClientContext; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; @@ -105,6 +107,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Busy lock. */ private final GridSpinBusyLock busyLock; + /** Worker. */ + private final JdbcRequestHandlerWorker worker; + /** Maximum allowed cursors. */ private final int maxCursors; @@ -126,6 +131,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Automatic close of cursors. */ private final boolean autoCloseCursors; + /** Nested transactions handling mode. */ + private final NestedTxMode nestedTxMode; + /** Protocol version. */ private ClientListenerProtocolVersion protocolVer; @@ -151,7 +159,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, JdbcResponseSender sender, int maxCursors, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, - boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate, + boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate, NestedTxMode nestedTxMode, AuthorizationContext actx, ClientListenerProtocolVersion protocolVer) { this.ctx = ctx; this.sender = sender; @@ -176,10 +184,16 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { this.busyLock = busyLock; this.maxCursors = maxCursors; this.autoCloseCursors = autoCloseCursors; + this.nestedTxMode = nestedTxMode; this.protocolVer = protocolVer; this.actx = actx; log = ctx.log(getClass()); + + if (ctx.grid().configuration().isMvccEnabled()) + worker = new JdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, this, ctx); + else + worker = null; } /** {@inheritDoc} */ @@ -190,6 +204,34 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { JdbcRequest req = (JdbcRequest)req0; + if (worker == null) + return doHandle(req); + else { + GridFutureAdapter<ClientListenerResponse> fut = worker.process(req); + + try { + return fut.get(); + } + catch (IgniteCheckedException e) { + return exceptionToResult(e); + } + } + } + + /** + * Start worker, if it's present. + */ + void start() { + if (worker != null) + worker.start(); + } + + /** + * Actually handle the request. + * @param req Request. + * @return Request handling result. + */ + ClientListenerResponse doHandle(JdbcRequest req) { if (!busyLock.enterBusy()) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Failed to handle JDBC request because node is stopping."); @@ -364,6 +406,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { public void onDisconnect() { if (busyLock.enterBusy()) { + if (worker != null) { + worker.cancel(); + + try { + worker.join(); + } + catch (InterruptedException e) { + // No-op. + } + } + try { for (JdbcQueryCursor cursor : qryCursors.values()) @@ -411,11 +464,11 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { try { String sql = req.sqlQuery(); - SqlFieldsQuery qry; + SqlFieldsQueryEx qry; switch(req.expectedStatementType()) { case ANY_STATEMENT_TYPE: - qry = new SqlFieldsQuery(sql); + qry = new SqlFieldsQueryEx(sql, null); break; @@ -440,6 +493,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { qry.setCollocated(cliCtx.isCollocated()); qry.setReplicatedOnly(cliCtx.isReplicatedOnly()); qry.setLazy(cliCtx.isLazy()); + qry.setNestedTxMode(nestedTxMode); + qry.setAutoCommit(req.autoCommit()); if (req.pageSize() <= 0) return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid fetch size: " + req.pageSize()); @@ -656,6 +711,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { qry.setCollocated(cliCtx.isCollocated()); qry.setReplicatedOnly(cliCtx.isReplicatedOnly()); qry.setLazy(cliCtx.isLazy()); + qry.setNestedTxMode(nestedTxMode); + qry.setAutoCommit(req.autoCommit()); qry.setSchema(schemaName); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandlerWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandlerWorker.java new file mode 100644 index 0000000..7211787 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandlerWorker.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener; +import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +/** + * JDBC request handler worker to maintain single threaded transactional execution of SQL statements when MVCC is on.<p> + * This worker is intended for internal use as a temporary solution and from within {@link JdbcRequestHandler}, + * therefore it does not do any fine-grained lifecycle handling as it relies on existing guarantees from + * {@link ClientListenerNioListener}. + */ +class JdbcRequestHandlerWorker extends GridWorker { + /** Requests queue.*/ + private final LinkedBlockingQueue<T2<JdbcRequest, GridFutureAdapter<ClientListenerResponse>>> queue = + new LinkedBlockingQueue<>(); + + /** Handler.*/ + private final JdbcRequestHandler hnd; + + /** Context.*/ + private final GridKernalContext ctx; + + /** Response */ + private final static ClientListenerResponse ERR_RESPONSE = new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, + "Connection closed."); + + /** + * Constructor. + * @param igniteInstanceName Instance name. + * @param log Logger. + * @param hnd Handler. + * @param ctx Kernal context. + */ + JdbcRequestHandlerWorker(@Nullable String igniteInstanceName, IgniteLogger log, JdbcRequestHandler hnd, + GridKernalContext ctx) { + super(igniteInstanceName, "jdbc-request-handler-worker", log); + + A.notNull(hnd, "hnd"); + + this.hnd = hnd; + + this.ctx = ctx; + } + + /** + * Start this worker. + */ + void start() { + new IgniteThread(this).start(); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + try { + while (!isCancelled()) { + T2<JdbcRequest, GridFutureAdapter<ClientListenerResponse>> req = queue.take(); + + GridFutureAdapter<ClientListenerResponse> fut = req.get2(); + + try { + ClientListenerResponse res = hnd.doHandle(req.get1()); + + fut.onDone(res); + } + catch (Exception e) { + fut.onDone(e); + } + } + } + finally { + // Notify indexing that this worker is being stopped. + try { + ctx.query().getIndexing().onClientDisconnect(); + } + catch (Exception e) { + // No-op. + } + + // Drain the queue on stop. + T2<JdbcRequest, GridFutureAdapter<ClientListenerResponse>> req = queue.poll(); + + while (req != null) { + req.get2().onDone(ERR_RESPONSE); + + req = queue.poll(); + } + } + } + + /** + * Initiate request processing. + * @param req Request. + * @return Future to track request processing. + */ + GridFutureAdapter<ClientListenerResponse> process(JdbcRequest req) { + GridFutureAdapter<ClientListenerResponse> fut = new GridFutureAdapter<>(); + + queue.add(new T2<>(req, fut)); + + return fut; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java index 253b183..2b51741 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.odbc.odbc; -import java.util.HashSet; -import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryReaderExImpl; @@ -27,8 +25,12 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerAbstractConnecti import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; +import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.util.GridSpinBusyLock; +import java.util.HashSet; +import java.util.Set; + /** * ODBC Connection Context. */ @@ -45,7 +47,7 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte /** Version 2.3.2: added multiple statements support. */ public static final ClientListenerProtocolVersion VER_2_3_2 = ClientListenerProtocolVersion.create(2, 3, 2); - /** Version 2.5.0: added authentication. */ + /** Version 2.5.0: added authentication and transactions. */ public static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0); /** Version 2.7.0: added precision and scale. */ @@ -111,6 +113,7 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte boolean enforceJoinOrder = reader.readBoolean(); boolean replicatedOnly = reader.readBoolean(); boolean collocated = reader.readBoolean(); + boolean lazy = false; if (ver.compareTo(VER_2_1_5) >= 0) @@ -124,17 +127,25 @@ public class OdbcConnectionContext extends ClientListenerAbstractConnectionConte String user = null; String passwd = null; + NestedTxMode nestedTxMode = NestedTxMode.DEFAULT; + if (ver.compareTo(VER_2_5_0) >= 0) { user = reader.readString(); passwd = reader.readString(); + + byte nestedTxModeVal = reader.readByte(); + + nestedTxMode = NestedTxMode.fromByte(nestedTxModeVal); } AuthorizationContext actx = authenticate(user, passwd); - handler = new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, - enforceJoinOrder, replicatedOnly, collocated, lazy, skipReducerOnUpdate, actx, ver); + handler = new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder, + replicatedOnly, collocated, lazy, skipReducerOnUpdate, actx, nestedTxMode, ver); parser = new OdbcMessageParser(ctx, ver); + + handler.start(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java index c2137bd..552841d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java @@ -47,7 +47,7 @@ public class OdbcMessageParser implements ClientListenerMessageParser { protected static final int INIT_CAP = 1024; /** Kernal context. */ - protected GridKernalContext ctx; + protected final GridKernalContext ctx; /** Logger. */ private final IgniteLogger log; @@ -101,7 +101,12 @@ public class OdbcMessageParser implements ClientListenerMessageParser { if (ver.compareTo(OdbcConnectionContext.VER_2_3_2) >= 0) timeout = reader.readInt(); - res = new OdbcQueryExecuteRequest(schema, sql, params, timeout); + boolean autoCommit = true; + + if (ver.compareTo(OdbcConnectionContext.VER_2_5_0) >= 0) + autoCommit = reader.readBoolean(); + + res = new OdbcQueryExecuteRequest(schema, sql, params, timeout, autoCommit); break; } @@ -123,7 +128,12 @@ public class OdbcMessageParser implements ClientListenerMessageParser { if (ver.compareTo(OdbcConnectionContext.VER_2_3_2) >= 0) timeout = reader.readInt(); - res = new OdbcQueryExecuteBatchRequest(schema, sql, last, params, timeout); + boolean autoCommit = true; + + if (ver.compareTo(OdbcConnectionContext.VER_2_5_0) >= 0) + autoCommit = reader.readBoolean(); + + res = new OdbcQueryExecuteBatchRequest(schema, sql, last, params, timeout, autoCommit); break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchRequest.java index 0e4effd..75c2831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchRequest.java @@ -41,6 +41,10 @@ public class OdbcQueryExecuteBatchRequest extends OdbcRequest { @GridToStringExclude private final Object[][] args; + /** Autocommit flag. */ + @GridToStringInclude + private final boolean autoCommit; + /** Query timeout in seconds. */ @GridToStringInclude private final int timeout; @@ -51,9 +55,10 @@ public class OdbcQueryExecuteBatchRequest extends OdbcRequest { * @param last Last page flag. * @param args Arguments list. * @param timeout Timeout in seconds. + * @param autoCommit Autocommit flag. */ public OdbcQueryExecuteBatchRequest(@Nullable String schema, String sqlQry, boolean last, Object[][] args, - int timeout) { + int timeout, boolean autoCommit) { super(QRY_EXEC_BATCH); assert sqlQry != null : "SQL query should not be null"; @@ -64,6 +69,7 @@ public class OdbcQueryExecuteBatchRequest extends OdbcRequest { this.last = last; this.args = args; this.timeout = timeout; + this.autoCommit = autoCommit; } /** @@ -106,4 +112,11 @@ public class OdbcQueryExecuteBatchRequest extends OdbcRequest { @Override public String toString() { return S.toString(OdbcQueryExecuteBatchRequest.class, this, "args", args, true); } + + /** + * @return Autocommit flag. + */ + public boolean autoCommit() { + return autoCommit; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java index 1fde908..7034b86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java @@ -37,6 +37,10 @@ public class OdbcQueryExecuteRequest extends OdbcRequest { @GridToStringExclude private final Object[] args; + /** Autocommit flag. */ + @GridToStringInclude + private final boolean autoCommit; + /** Query timeout in seconds. */ @GridToStringInclude private final int timeout; @@ -47,7 +51,8 @@ public class OdbcQueryExecuteRequest extends OdbcRequest { * @param args Arguments list. * @param timeout Timeout in seconds. */ - public OdbcQueryExecuteRequest(@Nullable String schema, String sqlQry, Object[] args, int timeout) { + public OdbcQueryExecuteRequest(@Nullable String schema, String sqlQry, Object[] args, int timeout, + boolean autoCommit) { super(QRY_EXEC); assert sqlQry != null : "SQL query should not be null"; @@ -56,6 +61,7 @@ public class OdbcQueryExecuteRequest extends OdbcRequest { this.sqlQry = sqlQry; this.args = args; this.timeout = timeout; + this.autoCommit = autoCommit; } /** @@ -91,4 +97,10 @@ public class OdbcQueryExecuteRequest extends OdbcRequest { return S.toString(OdbcQueryExecuteRequest.class, this, "args", args, true); } + /** + * @return Autocommit flag. + */ + public boolean autoCommit() { + return autoCommit; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java index b0f27d0..9510e63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.FieldsQueryCursor; @@ -46,7 +47,9 @@ import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.NestedTxMode; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -76,6 +79,9 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { /** Busy lock. */ private final GridSpinBusyLock busyLock; + /** Worker. */ + private final OdbcRequestHandlerWorker worker; + /** Maximum allowed cursors. */ private final int maxCursors; @@ -91,6 +97,9 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { /** Replicated only flag. */ private final boolean replicatedOnly; + /** Nested transaction behaviour. */ + private final NestedTxMode nestedTxMode; + /** Collocated flag. */ private final boolean collocated; @@ -101,7 +110,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { private final boolean skipReducerOnUpdate; /** Authentication context */ - private AuthorizationContext actx; + private final AuthorizationContext actx; /** Client version. */ private ClientListenerProtocolVersion ver; @@ -117,12 +126,13 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @param collocated Collocated flag. * @param lazy Lazy flag. * @param skipReducerOnUpdate Skip reducer on update flag. + * @param nestedTxMode Nested transaction mode. * @param actx Authentication context. * @param ver Client protocol version. */ public OdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, boolean distributedJoins, boolean enforceJoinOrder, boolean replicatedOnly, boolean collocated, boolean lazy, - boolean skipReducerOnUpdate, AuthorizationContext actx, ClientListenerProtocolVersion ver) { + boolean skipReducerOnUpdate, AuthorizationContext actx, NestedTxMode nestedTxMode, ClientListenerProtocolVersion ver) { this.ctx = ctx; this.busyLock = busyLock; this.maxCursors = maxCursors; @@ -133,20 +143,56 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { this.lazy = lazy; this.skipReducerOnUpdate = skipReducerOnUpdate; this.actx = actx; + this.nestedTxMode = nestedTxMode; this.ver = ver; log = ctx.log(getClass()); + + if (ctx.grid().configuration().isMvccEnabled()) + worker = new OdbcRequestHandlerWorker(ctx.igniteInstanceName(), log, this, ctx); + else + worker = null; } /** {@inheritDoc} */ @Override public ClientListenerResponse handle(ClientListenerRequest req0) { assert req0 != null; + assert req0 instanceof OdbcRequest; + OdbcRequest req = (OdbcRequest)req0; + if (worker == null) + return doHandle(req); + else { + GridFutureAdapter<ClientListenerResponse> fut = worker.process(req); + + try { + return fut.get(); + } + catch (IgniteCheckedException e) { + return exceptionToResult(e); + } + } + } + + /** + * Start worker, if it's present. + */ + void start() { + if (worker != null) + worker.start(); + } + + /** + * Handle ODBC request. + * @param req ODBC request. + * @return Response. + */ + public ClientListenerResponse doHandle(OdbcRequest req) { if (!busyLock.enterBusy()) return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, - "Failed to handle ODBC request because node is stopping: " + req); + "Failed to handle ODBC request because node is stopping: " + req); if (actx != null) AuthorizationContext.context(actx); @@ -204,6 +250,17 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { public void onDisconnect() { if (busyLock.enterBusy()) { + if (worker != null) { + worker.cancel(); + + try { + worker.join(); + } + catch (InterruptedException e) { + // No-op. + } + } + try { for (OdbcQueryResults res : qryResults.values()) @@ -220,9 +277,11 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @param schema Schema. * @param sql SQL request. * @param args Arguments. + * @param autoCommit Autocommit transaction. + * @param timeout Query timeout. * @return Query instance. */ - private SqlFieldsQueryEx makeQuery(String schema, String sql, Object[] args, int timeout) { + private SqlFieldsQueryEx makeQuery(String schema, String sql, Object[] args, int timeout, boolean autoCommit) { SqlFieldsQueryEx qry = new SqlFieldsQueryEx(sql, null); qry.setArgs(args); @@ -234,6 +293,8 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { qry.setLazy(lazy); qry.setSchema(schema); qry.setSkipReducerOnUpdate(skipReducerOnUpdate); + qry.setNestedTxMode(nestedTxMode); + qry.setAutoCommit(autoCommit); qry.setTimeout(timeout, TimeUnit.SECONDS); @@ -264,7 +325,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() + ", parsed=" + sql + ']'); - SqlFieldsQuery qry = makeQuery(req.schema(), sql, req.arguments(), req.timeout()); + SqlFieldsQuery qry = makeQuery(req.schema(), sql, req.arguments(), req.timeout(), req.autoCommit()); List<FieldsQueryCursor<List<?>>> cursors = ctx.query().querySqlFields(qry, true, false); @@ -313,7 +374,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() + ", parsed=" + sql + ']'); - SqlFieldsQueryEx qry = makeQuery(req.schema(), sql, null, req.timeout()); + SqlFieldsQueryEx qry = makeQuery(req.schema(), sql, null, req.timeout(), req.autoCommit()); Object[][] paramSet = req.arguments(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandlerWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandlerWorker.java new file mode 100644 index 0000000..4184b6a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandlerWorker.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener; +import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * ODBC request handler worker to maintain single threaded transactional execution of SQL statements when MVCC is on.<p> + * This worker is intended for internal use as a temporary solution and from within {@link OdbcRequestHandler}, + * therefore it does not do any fine-grained lifecycle handling as it relies on existing guarantees from + * {@link ClientListenerNioListener}. + */ +class OdbcRequestHandlerWorker extends GridWorker { + /** Requests queue.*/ + private final LinkedBlockingQueue<T2<OdbcRequest, GridFutureAdapter<ClientListenerResponse>>> queue = + new LinkedBlockingQueue<>(); + + /** Handler.*/ + private final OdbcRequestHandler hnd; + + /** Context.*/ + private final GridKernalContext ctx; + + /** Response */ + private final static ClientListenerResponse ERR_RESPONSE = new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, + "Connection closed."); + + /** + * Constructor. + * @param igniteInstanceName Instance name. + * @param log Logger. + * @param hnd Handler. + * @param ctx Kernal context. + */ + OdbcRequestHandlerWorker(@Nullable String igniteInstanceName, IgniteLogger log, OdbcRequestHandler hnd, + GridKernalContext ctx) { + super(igniteInstanceName, "odbc-request-handler-worker", log); + + A.notNull(hnd, "hnd"); + + this.hnd = hnd; + + this.ctx = ctx; + } + + /** + * Start this worker. + */ + void start() { + new IgniteThread(this).start(); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + try { + while (!isCancelled()) { + T2<OdbcRequest, GridFutureAdapter<ClientListenerResponse>> req = queue.take(); + + GridFutureAdapter<ClientListenerResponse> fut = req.get2(); + + try { + ClientListenerResponse res = hnd.doHandle(req.get1()); + + fut.onDone(res); + } + catch (Exception e) { + fut.onDone(e); + } + } + } + finally { + // Notify indexing that this worker is being stopped. + try { + ctx.query().getIndexing().onClientDisconnect(); + } + catch (Exception e) { + // No-op. + } + + // Drain the queue on stop. + T2<OdbcRequest, GridFutureAdapter<ClientListenerResponse>> req = queue.poll(); + + while (req != null) { + req.get2().onDone(ERR_RESPONSE); + + req = queue.poll(); + } + } + } + + /** + * Initiate request processing. + * @param req Request. + * @return Future to track request processing. + */ + GridFutureAdapter<ClientListenerResponse> process(OdbcRequest req) { + GridFutureAdapter<ClientListenerResponse> fut = new GridFutureAdapter<>(); + + queue.add(new T2<>(req, fut)); + + return fut; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java new file mode 100644 index 0000000..fdb6f1e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/EnlistOperation.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.jetbrains.annotations.Nullable; + +/** + * Operations on entries which could be performed during transaction. + * Operations are used during SQL statements execution, but does not define exact SQL statements semantics. + * It is better to treat them independently and having their own semantics. + */ +public enum EnlistOperation { + /** + * This operation creates entry if it does not exist or raises visible failure otherwise. + */ + INSERT(GridCacheOperation.CREATE), + /** + * This operation creates entry if it does not exist or modifies existing one otherwise. + */ + UPSERT(GridCacheOperation.UPDATE), + /** + * This operation modifies existing entry or does nothing if entry does not exist. + */ + UPDATE(GridCacheOperation.UPDATE), + /** + * This operation deletes existing entry or does nothing if entry does not exist. + */ + DELETE(GridCacheOperation.DELETE), + /** + * This operation locks existing entry protecting it from updates by other transactions + * or does notrhing if entry does not exist. + */ + LOCK(null); + + /** */ + private final GridCacheOperation cacheOp; + + /** */ + EnlistOperation(GridCacheOperation cacheOp) { + this.cacheOp = cacheOp; + } + + /** + * @return Corresponding Cache operation. + */ + public GridCacheOperation cacheOperation() { + return cacheOp; + } + + /** */ + public boolean isDeleteOrLock() { + return this == DELETE || this == LOCK; + } + + /** + * Indicates that an operation cannot create new row. + */ + public boolean noCreate() { + // has no meaning for LOCK + assert this != LOCK; + + return this == UPDATE || this == DELETE; + } + + /** Enum values. */ + private static final EnlistOperation[] VALS = values(); + + /** + * @param ord Ordinal value. + * @return Enum value. + */ + @Nullable public static EnlistOperation fromOrdinal(int ord) { + return ord < 0 || ord >= VALS.length ? null : VALS[ord]; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index dedd075..7aa4021 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -31,6 +31,8 @@ import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -61,6 +63,13 @@ public interface GridQueryIndexing { public void stop() throws IgniteCheckedException; /** + * Performs necessary actions on disconnect of a stateful client (say, one associated with a transaction). + * + * @throws IgniteCheckedException If failed. + */ + public void onClientDisconnect() throws IgniteCheckedException; + + /** * Parses SQL query into two step query and executes it. * * @param schemaName Schema name. @@ -79,10 +88,11 @@ public interface GridQueryIndexing { * @param cliCtx Client context. * @param keepBinary Keep binary flag. * @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query. - * @param cancel Query cancel state handler. @return Cursor. + * @param tracker Query tracker. + * @return Cursor. */ public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, - SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel); + SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, MvccQueryTracker tracker, GridQueryCancel cancel); /** * Execute an INSERT statement using data streamer as receiver. @@ -223,6 +233,28 @@ public interface GridQueryIndexing { public void unregisterCache(GridCacheContext cctx, boolean rmvIdx) throws IgniteCheckedException; /** + * + * @param cctx Cache context. + * @param ids Involved cache ids. + * @param parts Partitions. + * @param schema Schema name. + * @param qry Query string. + * @param params Query parameters. + * @param flags Flags. + * @param pageSize Fetch page size. + * @param timeout Timeout. + * @param topVer Topology version. + * @param mvccSnapshot MVCC snapshot. + * @param cancel Query cancel object. + * @return Cursor over entries which are going to be changed. + * @throws IgniteCheckedException If failed. + */ + public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] ids, int[] parts, + String schema, String qry, Object[] params, int flags, + int pageSize, int timeout, AffinityTopologyVersion topVer, + MvccSnapshot mvccSnapshot, GridQueryCancel cancel) throws IgniteCheckedException; + + /** * Registers type if it was not known before or updates it otherwise. * * @param cctx Cache context. @@ -243,7 +275,10 @@ public interface GridQueryIndexing { * @param prevRowAvailable Whether previous row is available. * @throws IgniteCheckedException If failed. */ - public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row, CacheDataRow prevRow, + public void store(GridCacheContext cctx, + GridQueryTypeDescriptor type, + CacheDataRow row, + CacheDataRow prevRow, boolean prevRowAvailable) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 57eaa4a..bd5d91c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -1752,15 +1753,20 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param cacheIds Cache IDs. * @return Future that will be completed when rebuilding is finished. */ - public IgniteInternalFuture<?> rebuildIndexesFromHash(Collection<Integer> cacheIds) { + public IgniteInternalFuture<?> rebuildIndexesFromHash(Set<Integer> cacheIds) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to rebuild indexes from hash (grid is stopping)."); + // Because of alt type ids, there can be few entries in 'types' for a single cache. + // In order to avoid processing a cache more than once, let's track processed names. + Set<String> processedCacheNames = new HashSet<>(); + try { GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<Object, Object>(); for (Map.Entry<QueryTypeIdKey, QueryTypeDescriptorImpl> e : types.entrySet()) { - if (cacheIds.contains(CU.cacheId(e.getKey().cacheName()))) + if (cacheIds.contains(CU.cacheId(e.getKey().cacheName())) && + processedCacheNames.add(e.getKey().cacheName())) fut.add(rebuildIndexesFromHash(e.getKey().cacheName(), e.getValue())); } @@ -2040,6 +2046,32 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * + * @param cctx Cache context. + * @param cacheIds Involved cache ids. + * @param parts Partitions. + * @param schema Schema name. + * @param qry Query string. + * @param params Query parameters. + * @param flags Flags. + * @param pageSize Fetch page size. + * @param timeout Timeout. + * @param topVer Topology version. + * @param mvccSnapshot MVCC snapshot. + * @param cancel Query cancel object. + * @return Cursor over entries which are going to be changed. + * @throws IgniteCheckedException If failed. + */ + public UpdateSourceIterator<?> prepareDistributedUpdate(GridCacheContext<?, ?> cctx, int[] cacheIds, + int[] parts, String schema, String qry, Object[] params, int flags, int pageSize, int timeout, + AffinityTopologyVersion topVer, MvccSnapshot mvccSnapshot, + GridQueryCancel cancel) throws IgniteCheckedException { + checkxEnabled(); + + return idx.prepareDistributedUpdate(cctx, cacheIds, parts, schema, qry, params, flags, pageSize, timeout, topVer, mvccSnapshot, cancel); + } + + /** * Query SQL fields. * * @param qry Query. @@ -2104,7 +2136,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { GridQueryCancel cancel = new GridQueryCancel(); List<FieldsQueryCursor<List<?>>> res = - idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel); + idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, null, cancel); if (cctx != null) sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx); @@ -2519,15 +2551,15 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param cctx Cache context. - * @param val Row. + * @param row Row removed from cache. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(GridCacheContext cctx, CacheDataRow val) + public void remove(GridCacheContext cctx, CacheDataRow row) throws IgniteCheckedException { - assert val != null; + assert row != null; if (log.isDebugEnabled()) - log.debug("Remove [cacheName=" + cctx.name() + ", key=" + val.key()+ ", val=" + val.value() + "]"); + log.debug("Remove [cacheName=" + cctx.name() + ", key=" + row.key()+ ", val=" + row.value() + "]"); if (idx == null) return; @@ -2538,14 +2570,14 @@ public class GridQueryProcessor extends GridProcessorAdapter { try { QueryTypeDescriptorImpl desc = typeByValue(cctx.name(), cctx.cacheObjectContext(), - val.key(), - val.value(), + row.key(), + row.value(), false); if (desc == null) return; - idx.remove(cctx, desc, val); + idx.remove(cctx, desc, row); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NestedTxMode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NestedTxMode.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NestedTxMode.java new file mode 100644 index 0000000..3569003 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NestedTxMode.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.IgniteException; + +/** + * Behavior options when an attempt to start a nested transaction is made. + */ +public enum NestedTxMode { + /** Previously started transaction will be committed, new transaction will be started. */ + COMMIT, + + /** Warning will be printed to log, no new transaction will be started. */ + IGNORE, + + /** Exception will be thrown, previously started transaction will be rolled back. */ + ERROR; + + /** Default handling mode. */ + public final static NestedTxMode DEFAULT = ERROR; + + /** + * Get enum value from int + * + * @param val Int value. + * @return Enum value. + * @throws IgniteException if the is no enum value associated with the int value. + */ + public static NestedTxMode fromByte(byte val) { + switch (val) { + case 1: + return COMMIT; + + case 2: + return IGNORE; + + case 3: + return ERROR; + + default: + throw new IgniteException("Invalid nested transactions handling mode: " + val); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/UpdateSourceIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/UpdateSourceIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/UpdateSourceIterator.java new file mode 100644 index 0000000..69feb0f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/UpdateSourceIterator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.Iterator; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.jetbrains.annotations.NotNull; + +/** */ +public interface UpdateSourceIterator<T> extends GridCloseableIterator<T> { + /** + * @return Operation. + */ + public EnlistOperation operation(); + + /** + * Callback method which should be called before moving iteration into another thread. + */ + public default void beforeDetach() { + // No-op. + } + + /** {@inheritDoc} */ + @Override default void close() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override default boolean isClosed() { + return false; + } + + /** {@inheritDoc} */ + @Override default void removeX() throws IgniteCheckedException { + throw new UnsupportedOperationException("remove"); + } + + /** {@inheritDoc} */ + @Override default boolean hasNext() { + try { + return hasNextX(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override default T next() { + try { + return nextX(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override default void remove() { + try { + removeX(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override @NotNull default Iterator<T> iterator() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index 4d918a0..2d1ec36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -70,6 +70,9 @@ public class GridQueryNextPageResponse implements Message { /** Last page flag. */ private boolean last; + /** Remove mapping flag. */ + private boolean removeMapping; + /** * For {@link Externalizable}. */ @@ -230,6 +233,12 @@ public class GridQueryNextPageResponse implements Message { return false; writer.incrementState(); + + case 9: + if (!writer.writeBoolean("removeMapping", removeMapping)) + return false; + + writer.incrementState(); } return true; @@ -314,6 +323,13 @@ public class GridQueryNextPageResponse implements Message { return false; reader.incrementState(); + case 9: + removeMapping = reader.readBoolean("removeMapping"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridQueryNextPageResponse.class); @@ -326,7 +342,7 @@ public class GridQueryNextPageResponse implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; } /** @@ -357,6 +373,20 @@ public class GridQueryNextPageResponse implements Message { this.last = last; } + /** + * @param removeMapping Remove mapping flag. + */ + public void removeMapping(boolean removeMapping) { + this.removeMapping = removeMapping; + } + + /** + * @return Remove mapping flag. + */ + public boolean removeMapping() { + return removeMapping; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridQueryNextPageResponse.class, this, http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java index 6c0942a..6db7fa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.jetbrains.annotations.NotNull; @@ -35,6 +36,10 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { /** */ private List<MetastorageLifecycleListener> metastorageListeners = new ArrayList<>(); + /** */ + private List<DatabaseLifecycleListener> databaseListeners = new ArrayList<>(); + + /** * @param ctx Kernal context. */ @@ -54,4 +59,17 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { public List<MetastorageLifecycleListener> getMetastorageSubscribers() { return metastorageListeners; } + + /** */ + public void registerDatabaseListener(@NotNull DatabaseLifecycleListener databaseListener) { + if (databaseListener == null) + throw new NullPointerException("Database subscriber should be not-null."); + + databaseListeners.add(databaseListener); + } + + /** */ + public List<DatabaseLifecycleListener> getDatabaseListeners() { + return databaseListeners; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java index be76482..658e176 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.sql; import java.lang.reflect.Field; +import java.util.Collections; import java.util.HashSet; +import java.util.Set; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.util.typedef.F; @@ -38,6 +40,9 @@ public class SqlKeyword { /** Keyword: BATCH_SIZE */ public static final String BATCH_SIZE = "BATCH_SIZE"; + /** Keyword: BEGIN. */ + public static final String BEGIN = "BEGIN"; + /** Keyword: BIGINT */ public static final String BIGINT = "BIGINT"; @@ -65,6 +70,9 @@ public class SqlKeyword { /** Keyword: CHARSET. */ public static final String CHARSET = "CHARSET"; + /** Keyword: COMMIT. */ + public static final String COMMIT = "COMMIT"; + /** Keyword: COPY. */ public static final String COPY = "COPY"; @@ -206,6 +214,9 @@ public class SqlKeyword { /** Keyword: RESTRICT. */ public static final String RESTRICT = "RESTRICT"; + /** Keyword: ROLLBACK. */ + public static final String ROLLBACK = "ROLLBACK"; + /** Keyword: SET. */ public static final String SET = "SET"; @@ -221,6 +232,9 @@ public class SqlKeyword { /** Keyword: SPATIAL. */ public static final String SPATIAL = "SPATIAL"; + /** Keyword: START. */ + public static final String START = "START"; + /** Keyword: STREAMING. */ public static final String STREAMING = "STREAMING"; @@ -236,6 +250,9 @@ public class SqlKeyword { /** Keyword: TINYINT. */ public static final String TINYINT = "TINYINT"; + /** Keyword: TRANSACTION. */ + public static final String TRANSACTION = "TRANSACTION"; + /** Keyword: UNIQUE. */ public static final String UNIQUE = "UNIQUE"; @@ -251,6 +268,9 @@ public class SqlKeyword { /** Keyword: VARCHAR_CASESENSITIVE. */ public static final String VARCHAR_CASESENSITIVE = "VARCHAR_CASESENSITIVE"; + /** Keyword: WORK. */ + public static final String WORK = "WORK"; + /** Keyword: YEAR. */ public static final String YEAR = "YEAR"; http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java index d46863a..0be2623 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java @@ -17,17 +17,22 @@ package org.apache.ignite.internal.sql; +import org.apache.ignite.internal.sql.command.SqlBeginTransactionCommand; import org.apache.ignite.internal.sql.command.SqlAlterTableCommand; import org.apache.ignite.internal.sql.command.SqlAlterUserCommand; import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand; import org.apache.ignite.internal.sql.command.SqlCommand; +import org.apache.ignite.internal.sql.command.SqlCommitTransactionCommand; import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand; import org.apache.ignite.internal.sql.command.SqlCreateUserCommand; import org.apache.ignite.internal.sql.command.SqlDropIndexCommand; import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand; +import org.apache.ignite.internal.sql.command.SqlRollbackTransactionCommand; import org.apache.ignite.internal.sql.command.SqlDropUserCommand; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.sql.SqlKeyword.BEGIN; +import static org.apache.ignite.internal.sql.SqlKeyword.COMMIT; import static org.apache.ignite.internal.sql.SqlKeyword.ALTER; import static org.apache.ignite.internal.sql.SqlKeyword.COPY; import static org.apache.ignite.internal.sql.SqlKeyword.CREATE; @@ -35,15 +40,21 @@ import static org.apache.ignite.internal.sql.SqlKeyword.DROP; import static org.apache.ignite.internal.sql.SqlKeyword.HASH; import static org.apache.ignite.internal.sql.SqlKeyword.INDEX; import static org.apache.ignite.internal.sql.SqlKeyword.PRIMARY; +import static org.apache.ignite.internal.sql.SqlKeyword.ROLLBACK; import static org.apache.ignite.internal.sql.SqlKeyword.SET; import static org.apache.ignite.internal.sql.SqlKeyword.SPATIAL; +import static org.apache.ignite.internal.sql.SqlKeyword.START; +import static org.apache.ignite.internal.sql.SqlKeyword.TRANSACTION; import static org.apache.ignite.internal.sql.SqlKeyword.STREAMING; import static org.apache.ignite.internal.sql.SqlKeyword.TABLE; import static org.apache.ignite.internal.sql.SqlKeyword.UNIQUE; +import static org.apache.ignite.internal.sql.SqlKeyword.WORK; import static org.apache.ignite.internal.sql.SqlKeyword.USER; import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnexpectedToken; import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnsupportedIfMatchesKeyword; import static org.apache.ignite.internal.sql.SqlParserUtils.matchesKeyword; +import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatchesKeyword; +import static org.apache.ignite.internal.sql.SqlParserUtils.skipIfMatchesOptionalKeyword; /** * SQL parser. @@ -102,6 +113,16 @@ public class SqlParser { SqlCommand cmd = null; switch (lex.token()) { + case BEGIN: + cmd = processBegin(); + + break; + + case COMMIT: + cmd = processCommit(); + + break; + case CREATE: cmd = processCreate(); @@ -112,6 +133,16 @@ public class SqlParser { break; + case ROLLBACK: + cmd = processRollback(); + + break; + + case START: + cmd = processStart(); + + break; + case COPY: try { cmd = processCopy(); @@ -139,7 +170,7 @@ public class SqlParser { return cmd; } else - throw errorUnexpectedToken(lex, CREATE, DROP, ALTER, COPY, SET); + throw errorUnexpectedToken(lex, BEGIN, COMMIT, CREATE, DROP, ROLLBACK, COPY, SET, ALTER, START); case QUOTED: case MINUS: @@ -154,6 +185,30 @@ public class SqlParser { } /** + * Process BEGIN keyword. + * + * @return Command. + */ + private SqlCommand processBegin() { + skipIfMatchesOptionalKeyword(lex, TRANSACTION); + + skipIfMatchesOptionalKeyword(lex, WORK); + + return new SqlBeginTransactionCommand(); + } + + /** + * Process COMMIT keyword. + * + * @return Command. + */ + private SqlCommand processCommit() { + skipIfMatchesOptionalKeyword(lex, TRANSACTION); + + return new SqlCommitTransactionCommand(); + } + + /** * Process SET keyword. * * @return Command. @@ -214,7 +269,7 @@ public class SqlParser { errorUnsupportedIfMatchesKeyword(lex, HASH, PRIMARY, UNIQUE); } - throw errorUnexpectedToken(lex, INDEX, SPATIAL); + throw errorUnexpectedToken(lex, INDEX, SPATIAL, USER); } /** @@ -242,7 +297,29 @@ public class SqlParser { return cmd.parse(lex); } - throw errorUnexpectedToken(lex, INDEX); + throw errorUnexpectedToken(lex, INDEX, USER); + } + + /** + * Process ROLLBACK keyword. + * + * @return Command. + */ + private SqlCommand processRollback() { + skipIfMatchesOptionalKeyword(lex, TRANSACTION); + + return new SqlRollbackTransactionCommand(); + } + + /** + * Process START keyword. + * + * @return Command. + */ + private SqlCommand processStart() { + skipIfMatchesKeyword(lex, TRANSACTION); + + return new SqlBeginTransactionCommand(); } /** @@ -270,6 +347,6 @@ public class SqlParser { return cmd.parse(lex); } - throw errorUnexpectedToken(lex, TABLE); + throw errorUnexpectedToken(lex, TABLE, USER); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java index e99af1d..074bffa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java @@ -261,6 +261,20 @@ public class SqlParserUtils { } /** + * Skip token if it matches expected keyword by using lookahead. + * If next token is not what we expect, no shift is done. + * + * @param lex Lexer. + * @param expKeyword Expected keyword. + */ + static void skipIfMatchesOptionalKeyword(SqlLexer lex, String expKeyword) { + SqlLexerToken nextTok = lex.lookAhead(); + + if (matchesKeyword(nextTok, expKeyword)) + lex.shift(); + } + + /** * Skip next token if it matches expected type. * * @param lex Lexer. http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBeginTransactionCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBeginTransactionCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBeginTransactionCommand.java new file mode 100644 index 0000000..e890cc4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlBeginTransactionCommand.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.command; + +import org.apache.ignite.internal.sql.SqlLexer; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * BEGIN [TRANSACTION] command. + */ +public class SqlBeginTransactionCommand implements SqlCommand { + /** {@inheritDoc} */ + @Override public SqlCommand parse(SqlLexer lex) { + return this; + } + + /** {@inheritDoc} */ + @Override public String schemaName() { + return null; + } + + /** {@inheritDoc} */ + @Override public void schemaName(String schemaName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlBeginTransactionCommand.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlCommitTransactionCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlCommitTransactionCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlCommitTransactionCommand.java new file mode 100644 index 0000000..da14dea --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlCommitTransactionCommand.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.command; + +import org.apache.ignite.internal.sql.SqlLexer; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * COMMIT command. + */ +public class SqlCommitTransactionCommand implements SqlCommand { + /** {@inheritDoc} */ + @Override public SqlCommand parse(SqlLexer lex) { + return this; + } + + /** {@inheritDoc} */ + @Override public String schemaName() { + return null; + } + + /** {@inheritDoc} */ + @Override public void schemaName(String schemaName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlCommitTransactionCommand.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlRollbackTransactionCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlRollbackTransactionCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlRollbackTransactionCommand.java new file mode 100644 index 0000000..341b794 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlRollbackTransactionCommand.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.command; + +import org.apache.ignite.internal.sql.SqlLexer; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * ROLLBACK command. + */ +public class SqlRollbackTransactionCommand implements SqlCommand { + /** {@inheritDoc} */ + @Override public SqlCommand parse(SqlLexer lex) { + return this; + } + + /** {@inheritDoc} */ + @Override public String schemaName() { + return null; + } + + /** {@inheritDoc} */ + @Override public void schemaName(String schemaName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SqlRollbackTransactionCommand.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxMvccVersionCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxMvccVersionCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxMvccVersionCheckedException.java new file mode 100644 index 0000000..8a0a54b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxMvccVersionCheckedException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.transactions; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Exception thrown whenever transaction enters an unknown state. + */ +public class IgniteTxMvccVersionCheckedException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Creates new exception with given error message. + * + * @param msg Error message. + */ + public IgniteTxMvccVersionCheckedException(String msg) { + super(msg); + } + + /** + * Creates new exception with given error message and optional nested exception. + * + * @param msg Error message. + * @param cause Optional nested exception (can be <tt>null</tt>). + */ + public IgniteTxMvccVersionCheckedException(String msg, Throwable cause) { + super(msg, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index a724060..87f5882 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -285,12 +285,12 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig onDone(rdc != null ? rdc.reduce() : null); } catch (RuntimeException e) { - logError(null, "Failed to execute compound future reducer: " + this, e); + logError(logger(), "Failed to execute compound future reducer: " + this, e); onDone(e); } catch (AssertionError e) { - logError(null, "Failed to execute compound future reducer: " + this, e); + logError(logger(), "Failed to execute compound future reducer: " + this, e); onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index b36bf16..7b68a6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; @@ -65,6 +66,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { /** */ private IgniteBiPredicate<ClusterNode, Message> blockP; + /** */ + private volatile IgniteBiInClosure<ClusterNode, Message> c; + /** * @param node Node. * @return Test SPI. @@ -87,6 +91,9 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { Message msg0 = ioMsg.message(); + if (c != null) + c.apply(node, msg0); + synchronized (this) { boolean record = (recordClasses != null && recordClasses.contains(msg0.getClass())) || (recordP != null && recordP.apply(node, msg0)); @@ -196,20 +203,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { * @throws InterruptedException If interrupted. */ public void waitForBlocked() throws InterruptedException { - synchronized (this) { - while (blockedMsgs.isEmpty()) - wait(); - } + waitForBlocked(1); } /** - * @param cnt Number of messages to wait. - * + * @param size Number of messages to wait for. * @throws InterruptedException If interrupted. */ - public void waitForBlocked(int cnt) throws InterruptedException { + public void waitForBlocked(int size) throws InterruptedException { synchronized (this) { - while (blockedMsgs.size() < cnt) + while (blockedMsgs.size() < size) wait(); } } @@ -240,6 +243,13 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @param c Message closure. + */ + public void closure(IgniteBiInClosure<ClusterNode, Message> c) { + this.c = c; + } + + /** * @param blockP Message block predicate. */ public void blockMessages(IgniteBiPredicate<ClusterNode, Message> blockP) {