http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java index 7677d0d..119a389 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java @@ -23,7 +23,10 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -33,6 +36,9 @@ import java.util.Map; * Descriptor of type. */ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { + /** Space. */ + private final String space; + /** */ private String name; @@ -50,9 +56,15 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { /** Map with upper cased property names to help find properties based on SQL INSERT and MERGE queries. */ private final Map<String, GridQueryProperty> uppercaseProps = new HashMap<>(); + /** Mutex for operations on indexes. */ + private final Object idxMux = new Object(); + /** */ @GridToStringInclude - private final Map<String, QueryIndexDescriptorImpl> indexes = new HashMap<>(); + private final Map<String, QueryIndexDescriptorImpl> idxs = new HashMap<>(); + + /** Aliases. */ + private Map<String, String> aliases; /** */ private QueryIndexDescriptorImpl fullTextIdx; @@ -78,6 +90,25 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { /** */ private String affKey; + /** Obsolete. */ + private volatile boolean obsolete; + + /** + * Constructor. + * + * @param space Cache name. + */ + public QueryTypeDescriptorImpl(String space) { + this.space = space; + } + + /** + * @return Space. + */ + public String space() { + return space; + } + /** {@inheritDoc} */ @Override public String name() { return name; @@ -97,7 +128,7 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { * @return Table name. */ @Override public String tableName() { - return tblName; + return tblName != null ? tblName : name; } /** @@ -160,7 +191,9 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { /** {@inheritDoc} */ @Override public Map<String, GridQueryIndexDescriptor> indexes() { - return Collections.<String, GridQueryIndexDescriptor>unmodifiableMap(indexes); + synchronized (idxMux) { + return Collections.<String, GridQueryIndexDescriptor>unmodifiableMap(idxs); + } } /** {@inheritDoc} */ @@ -176,59 +209,74 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { } /** - * Adds index. + * Get index by name. * - * @param idxName Index name. - * @param type Index type. - * @param inlineSize Inline size. - * @return Index descriptor. - * @throws IgniteCheckedException In case of error. + * @param name Name. + * @return Index. */ - public QueryIndexDescriptorImpl addIndex(String idxName, QueryIndexType type, int inlineSize) throws IgniteCheckedException { - QueryIndexDescriptorImpl idx = new QueryIndexDescriptorImpl(type, inlineSize); + @Nullable public QueryIndexDescriptorImpl index(String name) { + synchronized (idxMux) { + return idxs.get(name); + } + } - if (indexes.put(idxName, idx) != null) - throw new IgniteCheckedException("Index with name '" + idxName + "' already exists."); + /** + * @return Raw index descriptors. + */ + public Collection<QueryIndexDescriptorImpl> indexes0() { + synchronized (idxMux) { + return new ArrayList<>(idxs.values()); + } + } - return idx; + /** {@inheritDoc} */ + @Override public GridQueryIndexDescriptor textIndex() { + return fullTextIdx; } /** - * Adds field to index. + * Add index. * - * @param idxName Index name. - * @param field Field name. - * @param orderNum Fields order number in index. - * @param inlineSize Inline size. - * @param descending Sorting order. + * @param idx Index. * @throws IgniteCheckedException If failed. */ - public void addFieldToIndex( - String idxName, - String field, - int orderNum, - int inlineSize, - boolean descending - ) throws IgniteCheckedException { - QueryIndexDescriptorImpl desc = indexes.get(idxName); + public void addIndex(QueryIndexDescriptorImpl idx) throws IgniteCheckedException { + synchronized (idxMux) { + if (idxs.put(idx.name(), idx) != null) + throw new IgniteCheckedException("Index with name '" + idx.name() + "' already exists."); + } + } - if (desc == null) - desc = addIndex(idxName, QueryIndexType.SORTED, inlineSize); + /** + * Drop index. + * + * @param idxName Index name. + */ + public void dropIndex(String idxName) { + synchronized (idxMux) { + idxs.remove(idxName); + } + } - desc.addField(field, orderNum, descending); + /** + * Chedk if particular field exists. + * + * @param field Field. + * @return {@code True} if exists. + */ + public boolean hasField(String field) { + return props.containsKey(field) || QueryUtils._VAL.equalsIgnoreCase(field); } /** * Adds field to text index. * * @param field Field name. + * @throws IgniteCheckedException If failed. */ - public void addFieldToTextIndex(String field) { - if (fullTextIdx == null) { - fullTextIdx = new QueryIndexDescriptorImpl(QueryIndexType.FULLTEXT, 0); - - indexes.put(null, fullTextIdx); - } + public void addFieldToTextIndex(String field) throws IgniteCheckedException { + if (fullTextIdx == null) + fullTextIdx = new QueryIndexDescriptorImpl(this, null, QueryIndexType.FULLTEXT, 0); fullTextIdx.addField(field, 0, false); } @@ -335,6 +383,34 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor { this.affKey = affKey; } + /** + * @return Aliases. + */ + public Map<String, String> aliases() { + return aliases != null ? aliases : Collections.<String, String>emptyMap(); + } + + /** + * @param aliases Aliases. + */ + public void aliases(Map<String, String> aliases) { + this.aliases = aliases; + } + + /** + * @return {@code True} if obsolete. + */ + public boolean obsolete() { + return obsolete; + } + + /** + * Mark index as obsolete. + */ + public void markObsolete() { + obsolete = true; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(QueryTypeDescriptorImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java index f00cbd6..3a7437b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty; import org.apache.ignite.internal.processors.query.property.QueryClassProperty; import org.apache.ignite.internal.processors.query.property.QueryFieldAccessor; @@ -44,7 +45,6 @@ import java.lang.reflect.Method; import java.math.BigDecimal; import java.sql.Time; import java.sql.Timestamp; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -52,6 +52,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE; +import static org.apache.ignite.IgniteSystemProperties.getInteger; + /** * Utility methods for queries. */ @@ -59,6 +62,9 @@ public class QueryUtils { /** */ public static final String _VAL = "_val"; + /** Discovery history size. */ + private static final int DISCO_HIST_SIZE = getInteger(IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE, 1000); + /** */ private static final Class<?> GEOMETRY_CLASS = U.classForName("com.vividsolutions.jts.geom.Geometry", null); @@ -82,6 +88,69 @@ public class QueryUtils { )); /** + * Get table name for entity. + * + * @param entity Entity. + * @return Table name. + */ + public static String tableName(QueryEntity entity) { + String res = entity.getTableName(); + + if (res == null) + res = typeName(entity.getValueType()); + + return res; + } + + /** + * Get index name. + * + * @param entity Query entity. + * @param idx Index. + * @return Index name. + */ + public static String indexName(QueryEntity entity, QueryIndex idx) { + return indexName(tableName(entity), idx); + } + + /** + * Get index name. + * + * @param tblName Table name. + * @param idx Index. + * @return Index name. + */ + public static String indexName(String tblName, QueryIndex idx) { + String res = idx.getName(); + + if (res == null) { + StringBuilder idxName = new StringBuilder(tblName + "_"); + + for (Map.Entry<String, Boolean> field : idx.getFields().entrySet()) { + idxName.append(field.getKey()); + + idxName.append('_'); + idxName.append(field.getValue() ? "asc_" : "desc_"); + } + + for (int i = 0; i < idxName.length(); i++) { + char ch = idxName.charAt(i); + + if (Character.isWhitespace(ch)) + idxName.setCharAt(i, '_'); + else + idxName.setCharAt(i, Character.toLowerCase(ch)); + } + + idxName.append("idx"); + + return idxName.toString(); + } + + return res; + } + + /** * Create type candidate for query entity. * * @param space Space. @@ -103,7 +172,9 @@ public class QueryUtils { CacheObjectContext coCtx = binaryEnabled ? ctx.cacheObjects().contextForCache(ccfg) : null; - QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(); + QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(space); + + desc.aliases(qryEntity.getAliases()); // Key and value classes still can be available if they are primitive or JDK part. // We need that to set correct types for _key and _val columns. @@ -206,11 +277,6 @@ public class QueryUtils { */ public static void processBinaryMeta(GridKernalContext ctx, QueryEntity qryEntity, QueryTypeDescriptorImpl d) throws IgniteCheckedException { - Map<String,String> aliases = qryEntity.getAliases(); - - if (aliases == null) - aliases = Collections.emptyMap(); - Set<String> keyFields = qryEntity.getKeyFields(); // We have to distinguish between empty and null keyFields when the key is not of SQL type - @@ -239,7 +305,7 @@ public class QueryUtils { isKeyField = (hasKeyFields ? keyFields.contains(entry.getKey()) : null); QueryBinaryProperty prop = buildBinaryProperty(ctx, entry.getKey(), - U.classForName(entry.getValue(), Object.class, true), aliases, isKeyField); + U.classForName(entry.getValue(), Object.class, true), d.aliases(), isKeyField); d.addProperty(prop, false); } @@ -256,18 +322,13 @@ public class QueryUtils { */ public static void processClassMeta(QueryEntity qryEntity, QueryTypeDescriptorImpl d, CacheObjectContext coCtx) throws IgniteCheckedException { - Map<String,String> aliases = qryEntity.getAliases(); - - if (aliases == null) - aliases = Collections.emptyMap(); - for (Map.Entry<String, String> entry : qryEntity.getFields().entrySet()) { QueryClassProperty prop = buildClassProperty( d.keyClass(), d.valueClass(), entry.getKey(), U.classForName(entry.getValue(), Object.class), - aliases, + d.aliases(), coCtx); d.addProperty(prop, false); @@ -275,7 +336,7 @@ public class QueryUtils { processIndexes(qryEntity, d); } - + /** * Processes indexes based on query entity. * @@ -285,53 +346,90 @@ public class QueryUtils { */ private static void processIndexes(QueryEntity qryEntity, QueryTypeDescriptorImpl d) throws IgniteCheckedException { if (!F.isEmpty(qryEntity.getIndexes())) { - Map<String, String> aliases = qryEntity.getAliases(); + for (QueryIndex idx : qryEntity.getIndexes()) + processIndex(idx, d); + } + } - if (aliases == null) - aliases = Collections.emptyMap(); + /** + * Process dynamic index change. + * + * @param idx Index. + * @param d Type descriptor to populate. + * @throws IgniteCheckedException If failed to build index information. + */ + public static void processDynamicIndexChange(String idxName, @Nullable QueryIndex idx, QueryTypeDescriptorImpl d) + throws IgniteCheckedException { + d.dropIndex(idxName); - for (QueryIndex idx : qryEntity.getIndexes()) { - String idxName = idx.getName(); + if (idx != null) + processIndex(idx, d); + } - if (idxName == null) - idxName = QueryEntity.defaultIndexName(idx); + /** + * Create index descriptor. + * + * @param typeDesc Type descriptor. + * @param idx Index. + * @return Index descriptor. + * @throws IgniteCheckedException If failed. + */ + public static QueryIndexDescriptorImpl createIndexDescriptor(QueryTypeDescriptorImpl typeDesc, QueryIndex idx) + throws IgniteCheckedException { + String idxName = indexName(typeDesc.tableName(), idx); + QueryIndexType idxTyp = idx.getIndexType(); - QueryIndexType idxTyp = idx.getIndexType(); + assert idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL; - if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) { - d.addIndex(idxName, idxTyp, idx.getInlineSize()); + QueryIndexDescriptorImpl res = new QueryIndexDescriptorImpl(typeDesc, idxName, idxTyp, idx.getInlineSize()); - int i = 0; + int i = 0; - for (Map.Entry<String, Boolean> entry : idx.getFields().entrySet()) { - String field = entry.getKey(); - boolean asc = entry.getValue(); + for (Map.Entry<String, Boolean> entry : idx.getFields().entrySet()) { + String field = entry.getKey(); + boolean asc = entry.getValue(); - String alias = aliases.get(field); + String alias = typeDesc.aliases().get(field); - if (alias != null) - field = alias; + if (alias != null) + field = alias; - d.addFieldToIndex(idxName, field, i++, idx.getInlineSize(), !asc); - } - } - else if (idxTyp == QueryIndexType.FULLTEXT){ - for (String field : idx.getFields().keySet()) { - String alias = aliases.get(field); + res.addField(field, i++, !asc); + } - if (alias != null) - field = alias; + return res; + } - d.addFieldToTextIndex(field); - } - } - else if (idxTyp != null) - throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() + - ", typ=" + idxTyp + ']'); - else - throw new IllegalArgumentException("Index type is not set: " + idx.getName()); + /** + * Process single index. + * + * @param idx Index. + * @param d Type descriptor to populate. + * @throws IgniteCheckedException If failed to build index information. + */ + private static void processIndex(QueryIndex idx, QueryTypeDescriptorImpl d) throws IgniteCheckedException { + QueryIndexType idxTyp = idx.getIndexType(); + + if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) { + QueryIndexDescriptorImpl idxDesc = createIndexDescriptor(d, idx); + + d.addIndex(idxDesc); + } + else if (idxTyp == QueryIndexType.FULLTEXT){ + for (String field : idx.getFields().keySet()) { + String alias = d.aliases().get(field); + + if (alias != null) + field = alias; + + d.addFieldToTextIndex(field); } } + else if (idxTyp != null) + throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() + + ", typ=" + idxTyp + ']'); + else + throw new IllegalArgumentException("Index type is not set: " + idx.getName()); } /** @@ -674,6 +772,31 @@ public class QueryUtils { } /** + * Discovery history size. + * + * @return Discovery history size. + */ + public static int discoveryHistorySize() { + return DISCO_HIST_SIZE; + } + + /** + * Wrap schema exception if needed. + * + * @param e Original exception. + * @return Schema exception. + */ + @Nullable public static SchemaOperationException wrapIfNeeded(@Nullable Exception e) { + if (e == null) + return null; + + if (e instanceof SchemaOperationException) + return (SchemaOperationException)e; + + return new SchemaOperationException("Unexpected exception.", e); + } + + /** * Private constructor. */ private QueryUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java new file mode 100644 index 0000000..f97f931 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; +import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Cache schema change task for exchange worker. + */ +public class SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTask { + /** Message. */ + private final SchemaAbstractDiscoveryMessage msg; + + /** + * Constructor. + * + * @param msg Message. + */ + public SchemaExchangeWorkerTask(SchemaAbstractDiscoveryMessage msg) { + assert msg != null; + + this.msg = msg; + } + + /** + * @return Message. + */ + public SchemaAbstractDiscoveryMessage message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaExchangeWorkerTask.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java new file mode 100644 index 0000000..3321e66 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Closure that internally applies given {@link SchemaIndexCacheVisitorClosure} to some set of entries. + */ +public interface SchemaIndexCacheVisitor { + /** + * Visit cache entries and pass them to closure. + * + * @param clo Closure. + * @throws IgniteCheckedException If failed. + */ + public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java new file mode 100644 index 0000000..7f50089 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** + * Index closure accepting current entry state. + */ +public interface SchemaIndexCacheVisitorClosure { + /** + * Apply closure. + * + * @param key Key. + * @param part Partition. + * @param val Value. + * @param ver Version. + * @param expiration Expiration. + * @param link Link. + * @throws IgniteCheckedException If failed. + */ + public void apply(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver, long expiration, long link) + throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java new file mode 100644 index 0000000..58c909d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.Collection; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; + +/** + * Traversor operating all primary and backup partitions of given cache. + */ +public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor { + /** Query procssor. */ + private final GridQueryProcessor qryProc; + + /** Cache context. */ + private final GridCacheContext cctx; + + /** Space name. */ + private final String spaceName; + + /** Table name. */ + private final String tblName; + + /** Cancellation token. */ + private final SchemaIndexOperationCancellationToken cancel; + + /** + * Constructor. + * + * @param cctx Cache context. + * @param spaceName Space name. + * @param tblName Table name. + * @param cancel Cancellation token. + */ + public SchemaIndexCacheVisitorImpl(GridQueryProcessor qryProc, GridCacheContext cctx, String spaceName, + String tblName, SchemaIndexOperationCancellationToken cancel) { + this.qryProc = qryProc; + this.spaceName = spaceName; + this.tblName = tblName; + this.cancel = cancel; + + if (cctx.isNear()) + cctx = ((GridNearCacheAdapter)cctx.cache()).dht().context(); + + this.cctx = cctx; + } + + /** {@inheritDoc} */ + @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException { + assert clo != null; + + FilteringVisitorClosure filterClo = new FilteringVisitorClosure(clo); + + Collection<GridDhtLocalPartition> parts = cctx.topology().localPartitions(); + + for (GridDhtLocalPartition part : parts) + processPartition(part, filterClo); + } + + /** + * Process partition. + * + * @param part Partition. + * @param clo Index closure. + * @throws IgniteCheckedException If failed. + */ + private void processPartition(GridDhtLocalPartition part, FilteringVisitorClosure clo) + throws IgniteCheckedException { + checkCancelled(); + + boolean reserved = false; + + if (part != null && part.state() != EVICTED) + reserved = (part.state() == OWNING || part.state() == RENTING) && part.reserve(); + + if (!reserved) + return; + + try { + GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor(); + + while (cursor.next()) { + CacheDataRow row = cursor.get(); + + KeyCacheObject key = row.key(); + + processKey(key, row.link(), clo); + } + } + finally { + part.release(); + } + } + + /** + * Process single key. + * + * @param key Key. + * @param link Link. + * @param clo Closure. + * @throws IgniteCheckedException If failed. + */ + private void processKey(KeyCacheObject key, long link, FilteringVisitorClosure clo) throws IgniteCheckedException { + while (true) { + try { + checkCancelled(); + + GridCacheEntryEx entry = cctx.cache().entryEx(key); + + try { + entry.updateIndex(clo, link); + } + finally { + cctx.evicts().touch(entry, AffinityTopologyVersion.NONE); + } + + break; + } + catch (GridCacheEntryRemovedException ignored) { + // No-op. + } + } + } + + /** + * Check if visit process is not cancelled. + * + * @throws IgniteCheckedException If cancelled. + */ + private void checkCancelled() throws IgniteCheckedException { + if (cancel.isCancelled()) + throw new IgniteCheckedException("Index creation was cancelled."); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaIndexCacheVisitorImpl.class, this); + } + + /** + * Filtering visitor closure. + */ + private class FilteringVisitorClosure implements SchemaIndexCacheVisitorClosure { + + /** Target closure. */ + private final SchemaIndexCacheVisitorClosure target; + + /** + * Constructor. + * + * @param target Target. + */ + public FilteringVisitorClosure(SchemaIndexCacheVisitorClosure target) { + this.target = target; + } + + /** {@inheritDoc} */ + @Override public void apply(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver, + long expiration, long link) throws IgniteCheckedException { + if (qryProc.belongsToTable(cctx, spaceName, tblName, key, val)) + target.apply(key, part, val, ver, expiration, link); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java new file mode 100644 index 0000000..1bc3434 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Index operation cancellation token. + */ +public class SchemaIndexOperationCancellationToken { + /** Cancel flag. */ + private final AtomicBoolean flag = new AtomicBoolean(); + + /** + * Get cancel state. + * + * @return {@code True} if cancelled. + */ + public boolean isCancelled() { + return flag.get(); + } + + /** + * Do cancel. + * + * @return {@code True} if cancel flag was set by this call. + */ + public boolean cancel() { + return flag.compareAndSet(false, true); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaIndexOperationCancellationToken.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java new file mode 100644 index 0000000..3f12b77 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteUuid; + +/** + * Schema key. + */ +public class SchemaKey { + /** Space. */ + private final String space; + + /** Deployment ID. */ + private final IgniteUuid depId; + + /** + * Constructor. + * + * @param space Space. + * @param depId Deployment ID. + */ + public SchemaKey(String space, IgniteUuid depId) { + this.space = space; + this.depId = depId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * (space != null ? space.hashCode() : 0) + depId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj instanceof SchemaKey) { + SchemaKey other = (SchemaKey)obj; + + return F.eq(space, other.space) && F.eq(depId, other.depId); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java new file mode 100644 index 0000000..79fbfcd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Node leave exchange worker task. + */ +public class SchemaNodeLeaveExchangeWorkerTask implements CachePartitionExchangeWorkerTask { + /** Node. */ + @GridToStringInclude + private final ClusterNode node; + + /** + * Constructor. + * + * @param node Node. + */ + public SchemaNodeLeaveExchangeWorkerTask(ClusterNode node) { + this.node = node; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaNodeLeaveExchangeWorkerTask.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java new file mode 100644 index 0000000..6c47aff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * Schema operation client future. + */ +public class SchemaOperationClientFuture extends GridFutureAdapter<Object> { + /** Operation ID. */ + private final UUID opId; + + /** + * Constructor. + * + * @param opId Operation ID. + */ + public SchemaOperationClientFuture(UUID opId) { + this.opId = opId; + } + + /** + * @return Operation ID. + */ + public UUID operationId() { + return opId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaOperationClientFuture.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java new file mode 100644 index 0000000..f0db026 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Schema operation exception. + */ +public class SchemaOperationException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + + /** Code: generic error. */ + public static final int CODE_GENERIC = 0; + + /** Code: cache not found. */ + public static final int CODE_CACHE_NOT_FOUND = 1; + + /** Code: table not found. */ + public static final int CODE_TABLE_NOT_FOUND = 2; + + /** Code: table already exists. */ + public static final int CODE_TABLE_EXISTS = 3; + + /** Code: column not found. */ + public static final int CODE_COLUMN_NOT_FOUND = 4; + + /** Code: column already exists. */ + public static final int CODE_COLUMN_EXISTS = 5; + + /** Code: index not found. */ + public static final int CODE_INDEX_NOT_FOUND = 6; + + /** Code: index already exists. */ + public static final int CODE_INDEX_EXISTS = 7; + + /** Error code. */ + private final int code; + + /** + * Constructor for specific error type. + * + * @param code Code. + * @param objName Object name. + */ + public SchemaOperationException(int code, String objName) { + super(message(code, objName)); + + this.code = code; + } + + /** + * Constructor for generic error. + * + * @param msg Message. + */ + public SchemaOperationException(String msg) { + this(msg, null); + } + + /** + * Constructor for generic error. + * + * @param msg Message. + * @param cause Cause. + */ + public SchemaOperationException(String msg, Throwable cause) { + super(msg, cause); + + code = CODE_GENERIC; + } + + /** + * @return Code. + */ + public int code() { + return code; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaOperationException.class, this, "msg", getMessage()); + } + + /** + * Create message for specific code and object name. + * + * @param code Code. + * @param objName Object name. + * @return Message. + */ + private static String message(int code, String objName) { + switch (code) { + case CODE_CACHE_NOT_FOUND: + return "Cache doesn't exist: " + objName; + + case CODE_TABLE_NOT_FOUND: + return "Table doesn't exist: " + objName; + + case CODE_TABLE_EXISTS: + return "Table already exists: " + objName; + + case CODE_COLUMN_NOT_FOUND: + return "Column doesn't exist: " + objName; + + case CODE_COLUMN_EXISTS: + return "Column already exists: " + objName; + + case CODE_INDEX_NOT_FOUND: + return "Index doesn't exist: " + objName; + + case CODE_INDEX_EXISTS: + return "Index already exists: " + objName; + + default: + assert false; + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java new file mode 100644 index 0000000..eb0f3cd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.UUID; + +/** + * Schema operation manager. + */ +@SuppressWarnings("ThrowableResultOfMethodCallIgnored") +public class SchemaOperationManager { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Query processor. */ + private final GridQueryProcessor qryProc; + + /** Logger. */ + private final IgniteLogger log; + + /** Operation handler. */ + private final SchemaOperationWorker worker; + + /** Mutex for concurrency control. */ + private final Object mux = new Object(); + + /** Participants. */ + private Collection<UUID> nodeIds; + + /** Node results. */ + private Map<UUID, SchemaOperationException> nodeRess; + + /** Current coordinator node. */ + private ClusterNode crd; + + /** Whether coordinator state is mapped. */ + private boolean crdMapped; + + /** Coordinator finished flag. */ + private boolean crdFinished; + + /** + * Constructor. + * + * @param ctx Context. + * @param qryProc Query processor. + * @param worker Operation handler. + * @param crd Coordinator node. + */ + public SchemaOperationManager(GridKernalContext ctx, GridQueryProcessor qryProc, SchemaOperationWorker worker, + @Nullable ClusterNode crd) { + assert !ctx.clientNode() || crd == null; + + this.ctx = ctx; + + log = ctx.log(SchemaOperationManager.class); + + this.qryProc = qryProc; + this.worker = worker; + + synchronized (mux) { + this.crd = crd; + + prepareCoordinator(); + } + } + + /** + * Map operation handling. + */ + @SuppressWarnings("unchecked") + public void start() { + worker.start(); + + synchronized (mux) { + worker.future().listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + onLocalNodeFinished(fut); + } + }); + } + } + + /** + * Handle local node finish. + * + * @param fut Future. + */ + private void onLocalNodeFinished(IgniteInternalFuture fut) { + assert fut.isDone(); + + if (ctx.clientNode()) + return; + + SchemaOperationException err; + + try { + fut.get(); + + err = null; + } + catch (Exception e) { + err = QueryUtils.wrapIfNeeded(e); + } + + synchronized (mux) { + if (isLocalCoordinator()) + onNodeFinished(ctx.localNodeId(), err); + else + qryProc.sendStatusMessage(crd.id(), operationId(), err); + } + } + + /** + * Handle node finish. + * + * @param nodeId Node ID. + * @param err Error. + */ + public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err) { + synchronized (mux) { + assert isLocalCoordinator(); + + if (nodeRess.containsKey(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Received duplicate result [opId=" + operationId() + ", nodeId=" + nodeId + + ", err=" + err + ']'); + + return; + } + + if (nodeIds.contains(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']'); + + nodeRess.put(nodeId, err); + } + else { + if (log.isDebugEnabled()) + log.debug("Received result from non-tracked node (joined after operation started, will ignore) " + + "[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']'); + } + + checkFinished(); + } + } + + /** + * Handle node leave event. + * + * @param nodeId ID of the node that has left the grid. + * @param curCrd Current coordinator node. + */ + public void onNodeLeave(UUID nodeId, ClusterNode curCrd) { + synchronized (mux) { + assert crd != null; + + if (F.eq(nodeId, crd.id())) { + // Coordinator has left! + crd = curCrd; + + prepareCoordinator(); + } + else if (isLocalCoordinator()) { + // Other node has left, remove it from the coordinator's wait set. + // Handle this as success. + if (nodeIds.remove(nodeId)) + nodeRess.remove(nodeId); + } + + IgniteInternalFuture fut = worker().future(); + + if (fut.isDone()) + onLocalNodeFinished(fut); + + checkFinished(); + } + } + + /** + * Check if operation finished. + */ + private void checkFinished() { + assert Thread.holdsLock(mux); + + if (isLocalCoordinator()) { + if (crdFinished) + return; + + if (nodeIds.size() == nodeRess.size()) { + // Initiate finish request. + SchemaOperationException err = null; + + for (Map.Entry<UUID, SchemaOperationException> nodeRes : nodeRess.entrySet()) { + if (nodeRes.getValue() != null) { + err = nodeRes.getValue(); + + break; + } + } + + if (log.isDebugEnabled()) + log.debug("Collected all results, about to send finish message [opId=" + operationId() + + ", err=" + err + ']'); + + crdFinished = true; + + qryProc.onCoordinatorFinished(worker.operation(), err); + } + } + } + + /** + * Prepare topology state in case local node is coordinator. + * + * @return {@code True} if state was changed by this call. + */ + private boolean prepareCoordinator() { + if (isLocalCoordinator() && !crdMapped) { + // Initialize local structures. + nodeIds = new HashSet<>(); + nodeRess = new HashMap<>(); + + for (ClusterNode alive : ctx.discovery().aliveServerNodes()) + nodeIds.add(alive.id()); + + if (log.isDebugEnabled()) + log.debug("Mapped participating nodes on coordinator [opId=" + operationId() + + ", crdNodeId=" + ctx.localNodeId() + ", nodes=" + nodeIds + ']'); + + crdMapped = true; + + return true; + } + + return false; + } + + /** + * Check if current node is local coordinator. + * + * @return {@code True} if coordinator. + */ + private boolean isLocalCoordinator() { + assert Thread.holdsLock(mux); + + return crd != null && crd.isLocal(); + } + + /** + * @return Worker. + */ + public SchemaOperationWorker worker() { + return worker; + } + + /** + * @return Operation ID. + */ + private UUID operationId() { + return worker.operation().id(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java new file mode 100644 index 0000000..06feecb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Schema operation executor. + */ +public class SchemaOperationWorker extends GridWorker { + /** Query processor */ + private final GridQueryProcessor qryProc; + + /** Deployment ID. */ + private final IgniteUuid depId; + + /** Target operation. */ + private final SchemaAbstractOperation op; + + /** No-op flag. */ + private final boolean nop; + + /** Whether cache started. */ + private final boolean cacheRegistered; + + /** Type descriptor. */ + private final QueryTypeDescriptorImpl type; + + /** Operation future. */ + private final GridFutureAdapter fut; + + /** Public operation future. */ + private final GridFutureAdapter pubFut; + + /** Start guard. */ + private final AtomicBoolean startGuard = new AtomicBoolean(); + + /** Cancellation token. */ + private final SchemaIndexOperationCancellationToken cancelToken = new SchemaIndexOperationCancellationToken(); + + /** + * Constructor. + * + * @param ctx Context. + * @param qryProc Query processor. + * @param depId Deployment ID. + * @param op Target operation. + * @param nop No-op flag. + * @param err Predefined error. + * @param cacheRegistered Whether cache is registered in indexing at this point. + * @param type Type descriptor (if available). + */ + public SchemaOperationWorker(GridKernalContext ctx, GridQueryProcessor qryProc, IgniteUuid depId, + SchemaAbstractOperation op, boolean nop, @Nullable SchemaOperationException err, boolean cacheRegistered, + @Nullable QueryTypeDescriptorImpl type) { + super(ctx.igniteInstanceName(), workerName(op), ctx.log(SchemaOperationWorker.class)); + + this.qryProc = qryProc; + this.depId = depId; + this.op = op; + this.nop = nop; + this.cacheRegistered = cacheRegistered; + this.type = type; + + fut = new GridFutureAdapter(); + + if (err != null) + fut.onDone(err); + else if (nop || !cacheRegistered) + fut.onDone(); + + pubFut = publicFuture(fut); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + try { + // Execute. + qryProc.processIndexOperationLocal(op, type, depId, cancelToken); + + fut.onDone(); + } + catch (Exception e) { + fut.onDone(QueryUtils.wrapIfNeeded(e)); + } + } + + /** + * Perform initialization routine. + * + * @return This instance. + */ + public SchemaOperationWorker start() { + if (startGuard.compareAndSet(false, true)) { + if (!fut.isDone()) + new IgniteThread(this).start(); + } + + return this; + } + + /** + * Chain the future making sure that operation is completed after local schema is updated. + * + * @param fut Current future. + * @return Chained future. + */ + @SuppressWarnings("unchecked") + private GridFutureAdapter<?> publicFuture(GridFutureAdapter fut) { + final GridFutureAdapter<?> chainedFut = new GridFutureAdapter<>(); + + fut.listen(new IgniteInClosure<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + Exception err = null; + + try { + fut.get(); + + if (cacheRegistered && !nop) + qryProc.onLocalOperationFinished(op, type); + } + catch (Exception e) { + err = e; + } + finally { + chainedFut.onDone(null, err); + } + } + }); + + return chainedFut; + } + + /** + * @return No-op flag. + */ + public boolean nop() { + return nop; + } + + /** + * @return Whether cache is registered. + */ + public boolean cacheRegistered() { + return cacheRegistered; + } + + /** + * Cancel operation. + */ + public void cancel() { + if (cancelToken.cancel()) + super.cancel(); + } + + /** + * @return Operation. + */ + public SchemaAbstractOperation operation() { + return op; + } + + /** + * @return Future completed when operation is ready. + */ + public IgniteInternalFuture future() { + return pubFut; + } + + /** + * @return Worker name. + */ + private static String workerName(SchemaAbstractOperation op) { + return "schema-op-worker-" + op.id(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java new file mode 100644 index 0000000..9fdc6c3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; + +/** + * Abstract discovery message for schema operations. + */ +public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** ID */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** Operation. */ + @GridToStringInclude + protected final SchemaAbstractOperation op; + + /** + * Constructor. + * + * @param op Operation. + */ + protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) { + this.op = op; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** + * @return Operation. + */ + public SchemaAbstractOperation operation() { + return op; + } + + /** + * @return Whether request must be propagated to exchange thread. + */ + public abstract boolean exchange(); + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaAbstractDiscoveryMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java new file mode 100644 index 0000000..2245b24 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Schema change finish discovery message. + */ +public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Error. */ + private final SchemaOperationException err; + + /** Original propose message. */ + private transient SchemaProposeDiscoveryMessage proposeMsg; + + /** + * Constructor. + * + * @param op Original operation. + * @param err Error. + */ + public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err) { + super(op); + + this.err = err; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean exchange() { + return false; + } + + /** + * @return {@code True} if error was reported during init. + */ + public boolean hasError() { + return err != null; + } + + /** + * @return Error message (if any). + */ + @Nullable public SchemaOperationException error() { + return err; + } + + /** + * @return Propose message. + */ + public SchemaProposeDiscoveryMessage proposeMessage() { + return proposeMsg; + } + + /** + * @param proposeMsg Propose message. + */ + public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) { + this.proposeMsg = proposeMsg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java new file mode 100644 index 0000000..5f75e60 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * Schema operation status message. + */ +public class SchemaOperationStatusMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Operation ID. */ + private UUID opId; + + /** Error bytes (if any). */ + private byte[] errBytes; + + /** Sender node ID. */ + @GridDirectTransient + private UUID sndNodeId; + + /** + * Default constructor. + */ + public SchemaOperationStatusMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param opId Operation ID. + * @param errBytes Error bytes. + */ + public SchemaOperationStatusMessage(UUID opId, byte[] errBytes) { + this.opId = opId; + this.errBytes = errBytes; + } + + /** + * @return Operation ID. + */ + public UUID operationId() { + return opId; + } + + /** + * @return Error bytes. + */ + @Nullable public byte[] errorBytes() { + return errBytes; + } + + /** + * @return Sender node ID. + */ + public UUID senderNodeId() { + return sndNodeId; + } + + /** + * @param sndNodeId Sender node ID. + */ + public void senderNodeId(UUID sndNodeId) { + this.sndNodeId = sndNodeId; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("opId", opId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + opId = reader.readUuid("opId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(SchemaOperationStatusMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -53; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaOperationStatusMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java new file mode 100644 index 0000000..664ee03 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.message; + +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.query.schema.SchemaKey; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; +import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * Schema change propose discovery message. + */ +public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache deployment ID. */ + private IgniteUuid depId; + + /** Error. */ + private SchemaOperationException err; + + /** Whether to perform exchange. */ + private transient boolean exchange; + + /** + * Constructor. + * + * @param op Operation. + */ + public SchemaProposeDiscoveryMessage(SchemaAbstractOperation op) { + super(op); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean exchange() { + return exchange; + } + + /** + * @param exchange Whether to perform exchange. + */ + public void exchange(boolean exchange) { + this.exchange = exchange; + } + + /** + * @return Deployment ID. + */ + @Nullable public IgniteUuid deploymentId() { + return depId; + } + + /** + * @param depId Deployment ID. + */ + public void deploymentId(IgniteUuid depId) { + this.depId = depId; + } + + /** + * + * @return {@code True} if message is initialized. + */ + public boolean initialized() { + return deploymentId() != null || hasError(); + } + + /** + * Set error. + * + * @param err Error. + */ + public void onError(SchemaOperationException err) { + if (!hasError()) { + this.err = err; + } + } + + /** + * @return {@code True} if error was reported during init. + */ + public boolean hasError() { + return err != null; + } + + /** + * @return Error message (if any). + */ + @Nullable public SchemaOperationException error() { + return err; + } + + /** + * @return Schema key. + */ + public SchemaKey schemaKey() { + return new SchemaKey(operation().space(), depId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java new file mode 100644 index 0000000..8418ece --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.operation; + +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Abstract operation on schema. + */ +public abstract class SchemaAbstractOperation implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Operation ID. */ + private final UUID opId; + + /** Space. */ + private final String space; + + /** + * Constructor. + * + * @param opId Operation ID. + * @param space Space. + */ + public SchemaAbstractOperation(UUID opId, String space) { + this.opId = opId; + this.space = space; + } + + /** + * @return Operation id. + */ + public UUID id() { + return opId; + } + + /** + * @return Space. + */ + public String space() { + return space; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaAbstractOperation.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java new file mode 100644 index 0000000..fc4a9ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.operation; + +import java.util.UUID; + +/** + * Schema index abstract operation. + */ +public abstract class SchemaIndexAbstractOperation extends SchemaAbstractOperation { + /** + * Constructor. + * + * @param opId Operation ID. + * @param space Space. + */ + public SchemaIndexAbstractOperation(UUID opId, String space) { + super(opId, space); + } + + /** + * @return Index name. + */ + public abstract String indexName(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java new file mode 100644 index 0000000..9281f2a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.schema.operation; + +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * Schema index create operation. + */ +public class SchemaIndexCreateOperation extends SchemaIndexAbstractOperation { + /** */ + private static final long serialVersionUID = 0L; + + /** Table name. */ + private final String tblName; + + /** Index. */ + @GridToStringInclude + private final QueryIndex idx; + + /** Ignore operation if index exists. */ + private final boolean ifNotExists; + + /** + * Constructor. + * + * @param opId Operation id. + * @param space Space. + * @param tblName Table name. + * @param idx Index params. + * @param ifNotExists Ignore operation if index exists. + */ + public SchemaIndexCreateOperation(UUID opId, String space, String tblName, QueryIndex idx, boolean ifNotExists) { + super(opId, space); + + this.tblName = tblName; + this.idx = idx; + this.ifNotExists = ifNotExists; + } + + /** {@inheritDoc} */ + @Override public String indexName() { + return QueryUtils.indexName(tblName, idx); + } + + /** + * @return Table name. + */ + public String tableName() { + return tblName; + } + + /** + * @return Index params. + */ + public QueryIndex index() { + return idx; + } + + /** + * @return Ignore operation if index exists. + */ + public boolean ifNotExists() { + return ifNotExists; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SchemaIndexCreateOperation.class, this, "parent", super.toString()); + } +}