http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java new file mode 100644 index 0000000..1be7cd8 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstanceExists.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.TableOperations; + +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; + +/** + * An Accumulo implementation of the {@link InstanceExists} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloInstanceExists extends AccumuloCommand implements InstanceExists { + + /** + * Constructs an insatnce of {@link AccumuloInstanceExists}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloInstanceExists(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + } + + @Override + public boolean exists(final String instanceName) throws RyaClientException { + requireNonNull( instanceName ); + + final TableOperations tableOps = getConnector().tableOperations(); + + // Newer versions of Rya will have a Rya Details table. + final String ryaDetailsTableName = instanceName + AccumuloRyaInstanceDetailsRepository.INSTANCE_DETAILS_TABLE_NAME; + if(tableOps.exists(ryaDetailsTableName)) { + return true; + } + + // However, older versions only have the data tables. + final String spoTableName = instanceName + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + final String posTableName = instanceName + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; + final String ospTableName = instanceName + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; + if(tableOps.exists(spoTableName) && tableOps.exists(posTableName) && tableOps.exists(ospTableName)) { + return true; + } + + return false; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java new file mode 100644 index 0000000..86d96b8 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloListInstances.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.client.ListInstances; +import mvm.rya.api.client.RyaClientException; + +/** + * An Accumulo implementation of the {@link ListInstances} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloListInstances extends AccumuloCommand implements ListInstances { + + private final Pattern spoPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + private final Pattern ospPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + private final Pattern poPattern = Pattern.compile("(.*)" + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + + /** + * Constructs an instance of {@link AccumuloListInstances}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloListInstances(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + } + + @Override + public List<String> listInstances() throws RyaClientException { + // Figure out what the instance names might be. + final Map<String, InstanceTablesFound> proposedInstanceNames = new HashMap<>(); + + for(final String table : getConnector().tableOperations().list()) { + // SPO table + final Matcher spoMatcher = spoPattern.matcher(table); + if(spoMatcher.find()) { + final String instanceName = spoMatcher.group(1); + makeOrGetInstanceTables(proposedInstanceNames, instanceName).setSpoFound(); + } + + // OSP table + final Matcher ospMatcher = ospPattern.matcher(table); + if(ospMatcher.find()) { + final String instanceName = ospMatcher.group(1); + makeOrGetInstanceTables(proposedInstanceNames, instanceName).setOspFound(); + } + + // PO table + final Matcher poMatcher = poPattern.matcher(table); + if(poMatcher.find()) { + final String instanceName = poMatcher.group(1); + makeOrGetInstanceTables(proposedInstanceNames, instanceName).setPoFound(); + } + } + + // Determine which of the proposed names fit the expected Rya table structures. + final List<String> instanceNames = new ArrayList<>(); + for(final Entry<String, InstanceTablesFound> entry : proposedInstanceNames.entrySet()) { + final InstanceTablesFound tables = entry.getValue(); + if(tables.allFlagsSet()) { + instanceNames.add(entry.getKey()); + } + } + + return instanceNames; + } + + private InstanceTablesFound makeOrGetInstanceTables(final Map<String, InstanceTablesFound> lookup, final String instanceName) { + if(!lookup.containsKey(instanceName)) { + lookup.put(instanceName, new InstanceTablesFound()); + } + return lookup.get(instanceName); + } + + /** + * Flags that are used to determine if a String is a Rya Instance name. + */ + @ParametersAreNonnullByDefault + private static class InstanceTablesFound { + private boolean spoFound = false; + private boolean ospFound = false; + private boolean poFound = false; + + /** + * Sets the SPO table as seen. + */ + public void setSpoFound() { + spoFound = true; + } + + /** + * Sets the OSP table as seen. + */ + public void setOspFound() { + ospFound = true; + } + + /** + * Sets the POS table as seen. + */ + public void setPoFound() { + poFound = true; + } + + /** + * @return {@code true} if all of the flags have been set; otherwise {@code false}. + */ + public boolean allFlagsSet() { + return spoFound && ospFound && poFound; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java new file mode 100644 index 0000000..8c276a8 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +import mvm.rya.api.client.RyaClient; + +/** + * Constructs instance of {@link RyaClient} that are connected to instance of + * Rya hosted by Accumulo clusters. + */ +@ParametersAreNonnullByDefault +public class AccumuloRyaClientFactory { + + /** + * Initialize a set of {@link RyaClient} that will interact with an instance of + * Rya that is hosted by an Accumulo cluster. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - The Accumulo connector the commands will use. (not null) + * @return The initialized commands. + */ + public static RyaClient build( + final AccumuloConnectionDetails connectionDetails, + final Connector connector) { + requireNonNull(connectionDetails); + requireNonNull(connector); + + // Build the RyaCommands option with the initialized commands. + return new RyaClient( + new AccumuloInstall(connectionDetails, connector), + new AccumuloCreatePCJ(connectionDetails, connector), + new AccumuloDeletePCJ(connectionDetails, connector), + new AccumuloGetInstanceDetails(connectionDetails, connector), + new AccumuloInstanceExists(connectionDetails, connector), + new AccumuloListInstances(connectionDetails, connector)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java new file mode 100644 index 0000000..b80abfa --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/FluoClientFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.config.FluoConfiguration; + +/** + * Creates {@link FluoClient}s that are connected to a specific Fluo Application. + */ +@ParametersAreNonnullByDefault +public class FluoClientFactory { + + /** + * Create a {@link FluoClient} that uses the provided connection details. + * + * @param username - The username the connection will use. (not null) + * @param password - The password the connection will use. (not null) + * @param instanceName - The name of the Accumulo instance. (not null) + * @param zookeeperHostnames - A comma delimited list of the Zookeeper server hostnames. (not null) + * @param fluoAppName - The Fluo Application the client will be connected to. (not null) + * @return A {@link FluoClient} that may be used to access the Fluo Application. + */ + public FluoClient connect( + final String username, + final String password, + final String instanceName, + final String zookeeperHostnames, + final String fluoAppName) { + requireNonNull(username); + requireNonNull(password); + requireNonNull(instanceName); + requireNonNull(zookeeperHostnames); + requireNonNull(fluoAppName); + + final FluoConfiguration fluoConfig = new FluoConfiguration(); + + // Fluo configuration values. + fluoConfig.setApplicationName( fluoAppName ); + fluoConfig.setInstanceZookeepers( zookeeperHostnames + "/fluo" ); + + // Accumulo Connection Stuff. + fluoConfig.setAccumuloZookeepers( zookeeperHostnames ); + fluoConfig.setAccumuloInstance( instanceName ); + fluoConfig.setAccumuloUser( username ); + fluoConfig.setAccumuloPassword( password ); + + // Connect the client. + return FluoFactory.newClient(fluoConfig); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java index 0324a79..311b745 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinIndexer.java @@ -28,21 +28,13 @@ import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; -import mvm.rya.indexing.external.fluo.PcjUpdaterSupplierFactory; - import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater.PcjUpdateException; import org.openrdf.model.URI; @@ -50,6 +42,14 @@ import org.openrdf.model.URI; import com.google.common.base.Optional; import com.google.common.base.Supplier; +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier; +import mvm.rya.indexing.external.fluo.PcjUpdaterSupplierFactory; + /** * Updates the state of the Precomputed Join indices that are used by Rya. */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java deleted file mode 100644 index 6e79748..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorage.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package mvm.rya.indexing.external.accumulo; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.apache.accumulo.core.client.Connector; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -/** - * An Accumulo backed implementation of {@link PrecomputedJoinStorage}. - */ -@ParametersAreNonnullByDefault -public class AccumuloPcjStorage implements PrecomputedJoinStorage { - - private final PcjTableNameFactory pcjIdFactory = new PcjTableNameFactory(); - private final PcjTables pcjTables = new PcjTables(); - - private final Connector accumuloConn; - private final String ryaInstanceName; - - /** - * Constructs an instance of {@link AccumuloPcjStorage}. - * - * @param accumuloConn - The connector that will be used to connect to Accumulo. (not null) - * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null) - */ - public AccumuloPcjStorage(final Connector accumuloConn, final String ryaInstanceName) { - this.accumuloConn = checkNotNull(accumuloConn); - this.ryaInstanceName = checkNotNull(ryaInstanceName); - } - - @Override - public List<String> listPcjs() throws PCJStorageException { - return pcjTables.listPcjTables(accumuloConn, ryaInstanceName); - } - - @Override - public String createPcj(final String sparql, final Set<VariableOrder> varOrders) throws PCJStorageException { - final String pcjId = pcjIdFactory.makeTableName(ryaInstanceName); - pcjTables.createPcjTable(accumuloConn, pcjId, varOrders, sparql); - return pcjId; - } - - @Override - public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException { - return pcjTables.getPcjMetadata(accumuloConn, pcjId); - } - - @Override - public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException { - pcjTables.addResults(accumuloConn, pcjId, results); - } - - @Override - public void purge(final String pcjId) throws PCJStorageException { - pcjTables.purgePcjTable(accumuloConn, pcjId); - } - - @Override - public void dropPcj(final String pcjId) throws PCJStorageException { - pcjTables.dropPcjTable(accumuloConn, pcjId); - } - - @Override - public void close() throws PCJStorageException { - // Accumulo Connectors don't require closing. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java index 73b50db..3f4806e 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageConfig.java @@ -21,6 +21,7 @@ package mvm.rya.indexing.external.accumulo; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import mvm.rya.api.RdfCloudTripleStoreConfiguration; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java index 77b8f2e..b6d4121 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/accumulo/AccumuloPcjStorageSupplier.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.accumulo.core.client.Connector; import org.apache.hadoop.conf.Configuration; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import com.google.common.base.Optional; import com.google.common.base.Supplier; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java index 8b56018..04b6d2d 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSet.java @@ -18,8 +18,6 @@ */ package mvm.rya.indexing.external.tupleSet; -import info.aduna.iteration.CloseableIteration; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -30,17 +28,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; -import mvm.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator; -import mvm.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator.HashJoinType; -import mvm.rya.accumulo.pcj.iterators.IteratorCombiner; -import mvm.rya.accumulo.pcj.iterators.PCJKeyToCrossProductBindingSetIterator; -import mvm.rya.accumulo.pcj.iterators.PCJKeyToJoinBindingSetIterator; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.utils.IteratorWrapper; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.pcj.matching.PCJOptimizerUtilities; -import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; @@ -80,6 +67,18 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator; +import mvm.rya.accumulo.pcj.iterators.BindingSetHashJoinIterator.HashJoinType; +import mvm.rya.accumulo.pcj.iterators.IteratorCombiner; +import mvm.rya.accumulo.pcj.iterators.PCJKeyToCrossProductBindingSetIterator; +import mvm.rya.accumulo.pcj.iterators.PCJKeyToJoinBindingSetIterator; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.utils.IteratorWrapper; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.pcj.matching.PCJOptimizerUtilities; +import mvm.rya.rdftriplestore.evaluation.ExternalBatchingIterator; + /** * During query planning, this node is inserted into the parsed query to * represent part of the original query (a sub-query). This sub-query is the @@ -108,13 +107,13 @@ import com.google.common.collect.Sets; public class AccumuloIndexSet extends ExternalTupleSet implements ExternalBatchingIterator { - private Connector accCon; // connector to Accumulo table where results + private final Connector accCon; // connector to Accumulo table where results // are stored - private String tablename; // name of Accumulo table + private final String tablename; // name of Accumulo table private List<String> varOrder = null; // orders in which results are written // to table - private PcjTables pcj = new PcjTables(); - private Authorizations auths; + private final PcjTables pcj = new PcjTables(); + private final Authorizations auths; @Override @@ -143,21 +142,21 @@ public class AccumuloIndexSet extends ExternalTupleSet implements * @throws TableNotFoundException * @throws AccumuloSecurityException * @throws AccumuloException - * @throws PCJStorageException + * @throws PCJStorageException */ - public AccumuloIndexSet(String sparql, Configuration conf, - String tablename) throws MalformedQueryException, SailException, + public AccumuloIndexSet(final String sparql, final Configuration conf, + final String tablename) throws MalformedQueryException, SailException, QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException, PCJStorageException { this.tablename = tablename; this.accCon = ConfigUtils.getConnector(conf); this.auths = getAuthorizations(conf); - SPARQLParser sp = new SPARQLParser(); - ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); - TupleExpr te = pq.getTupleExpr(); + final SPARQLParser sp = new SPARQLParser(); + final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); + final TupleExpr te = pq.getTupleExpr(); Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), "TupleExpr is an invalid PCJ."); - Optional<Projection> projection = new ParsedQueryUtil() + final Optional<Projection> projection = new ParsedQueryUtil() .findProjection(pq); if (!projection.isPresent()) { throw new MalformedQueryException("SPARQL query '" + sparql @@ -188,7 +187,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements * @throws AccumuloSecurityException * @throws AccumuloException */ - public AccumuloIndexSet(Configuration conf, String tablename) + public AccumuloIndexSet(final Configuration conf, final String tablename) throws MalformedQueryException, SailException, QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException { this.accCon = ConfigUtils.getConnector(conf); @@ -201,12 +200,11 @@ public class AccumuloIndexSet extends ExternalTupleSet implements } this.tablename = tablename; - SPARQLParser sp = new SPARQLParser(); - ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery( - meta.getSparql(), null); + final SPARQLParser sp = new SPARQLParser(); + final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(meta.getSparql(), null); setProjectionExpr((Projection) pq.getTupleExpr()); - Set<VariableOrder> orders = meta.getVarOrders(); + final Set<VariableOrder> orders = meta.getVarOrders(); varOrder = Lists.newArrayList(); for (final VariableOrder var : orders) { @@ -217,7 +215,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements } - private Authorizations getAuthorizations(Configuration conf) { + private Authorizations getAuthorizations(final Configuration conf) { final String authString = conf.get(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, ""); if (authString.isEmpty()) { return new Authorizations(); @@ -234,7 +232,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements try { cardinality = pcj.getPcjMetadata(accCon, tablename) .getCardinality(); - } catch (PcjException e) { + } catch (final PcjException e) { e.printStackTrace(); } return cardinality; @@ -251,8 +249,8 @@ public class AccumuloIndexSet extends ExternalTupleSet implements * usually the variable orders in the table so that scans for * specific orders are more efficient */ - private void setLocalityGroups(String tableName, Connector conn, - List<String> groups) { + private void setLocalityGroups(final String tableName, final Connector conn, + final List<String> groups) { final HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); for (int i = 0; i < groups.size(); i++) { @@ -273,7 +271,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements @Override public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( - BindingSet bindingset) throws QueryEvaluationException { + final BindingSet bindingset) throws QueryEvaluationException { return this.evaluate(Collections.singleton(bindingset)); } @@ -296,26 +294,26 @@ public class AccumuloIndexSet extends ExternalTupleSet implements new HashSet<BindingSet>().iterator()); } - List<BindingSet> crossProductBs = new ArrayList<>(); - Map<String, org.openrdf.model.Value> constantConstraints = new HashMap<>(); - Set<Range> hashJoinRanges = new HashSet<>(); + final List<BindingSet> crossProductBs = new ArrayList<>(); + final Map<String, org.openrdf.model.Value> constantConstraints = new HashMap<>(); + final Set<Range> hashJoinRanges = new HashSet<>(); final Range EMPTY_RANGE = new Range("", true, "~", false); Range crossProductRange = EMPTY_RANGE; String localityGroupOrder = varOrder.get(0); int maxPrefixLen = Integer.MIN_VALUE; int prefixLen = 0; int oldPrefixLen = 0; - Multimap<String, BindingSet> bindingSetHashMap = HashMultimap.create(); + final Multimap<String, BindingSet> bindingSetHashMap = HashMultimap.create(); HashJoinType joinType = HashJoinType.CONSTANT_JOIN_VAR; - Set<String> unAssuredVariables = Sets.difference(getTupleExpr().getBindingNames(), getTupleExpr().getAssuredBindingNames()); + final Set<String> unAssuredVariables = Sets.difference(getTupleExpr().getBindingNames(), getTupleExpr().getAssuredBindingNames()); boolean useColumnScan = false; boolean isCrossProd = false; boolean containsConstantConstraints = false; - BindingSet constants = getConstantConstraints(); + final BindingSet constants = getConstantConstraints(); containsConstantConstraints = constants.size() > 0; try { - for (BindingSet bs : bindingset) { + for (final BindingSet bs : bindingset) { if (bindingset.size() == 1 && bs.size() == 0) { // in this case, only single, empty bindingset, pcj node is // first node in query plan - use full Range scan with @@ -325,9 +323,9 @@ public class AccumuloIndexSet extends ExternalTupleSet implements } // get common vars for PCJ - only use variables associated // with assured Bindings - QueryBindingSet commonVars = new QueryBindingSet(); - for (String b : getTupleExpr().getAssuredBindingNames()) { - Binding v = bs.getBinding(b); + final QueryBindingSet commonVars = new QueryBindingSet(); + for (final String b : getTupleExpr().getAssuredBindingNames()) { + final Binding v = bs.getBinding(b); if (v != null) { commonVars.addBinding(v); } @@ -339,7 +337,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements } //get a varOrder from orders in PCJ table - use at least //one common variable - BindingSetVariableOrder varOrder = getVarOrder( + final BindingSetVariableOrder varOrder = getVarOrder( commonVars.getBindingNames(), constants.getBindingNames()); @@ -348,8 +346,8 @@ public class AccumuloIndexSet extends ExternalTupleSet implements // variables commonVars.addAll(constants); if (commonVars.size() > varOrder.varOrderLen) { - Map<String, Value> valMap = getConstantValueMap(); - for (String s : new HashSet<String>(varOrder.unusedVars)) { + final Map<String, Value> valMap = getConstantValueMap(); + for (final String s : new HashSet<String>(varOrder.unusedVars)) { if (valMap.containsKey(s) && !constantConstraints.containsKey(s)) { constantConstraints.put(s, valMap.get(s)); @@ -393,7 +391,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements if (prefixLen > maxPrefixLen) { maxPrefixLen = prefixLen; } - String key = getHashJoinKey(varOrder.varOrder, commonVars); + final String key = getHashJoinKey(varOrder.varOrder, commonVars); bindingSetHashMap.put(key, bs); } @@ -405,7 +403,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements // BindingSets if ((useColumnScan || crossProductBs.size() > 0) && bindingSetHashMap.size() == 0) { - Scanner scanner = accCon.createScanner(tablename, auths); + final Scanner scanner = accCon.createScanner(tablename, auths); // cross product with no cross product constraints here scanner.setRange(crossProductRange); scanner.fetchColumnFamily(new Text(localityGroupOrder)); @@ -419,10 +417,10 @@ public class AccumuloIndexSet extends ExternalTupleSet implements // create an iterator to evaluate cross product and an iterator // for hash join, then combine - List<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorList = new ArrayList<>(); + final List<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorList = new ArrayList<>(); // create cross product iterator - Scanner scanner1 = accCon.createScanner(tablename, auths); + final Scanner scanner1 = accCon.createScanner(tablename, auths); scanner1.setRange(crossProductRange); scanner1.fetchColumnFamily(new Text(localityGroupOrder)); iteratorList.add(new PCJKeyToCrossProductBindingSetIterator( @@ -430,9 +428,9 @@ public class AccumuloIndexSet extends ExternalTupleSet implements getTableVarMap())); // create hash join iterator - BatchScanner scanner2 = accCon.createBatchScanner(tablename, auths, 10); + final BatchScanner scanner2 = accCon.createBatchScanner(tablename, auths, 10); scanner2.setRanges(hashJoinRanges); - PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator( + final PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator( scanner2, getTableVarMap(), maxPrefixLen); iteratorList.add(new BindingSetHashJoinIterator( bindingSetHashMap, iterator, unAssuredVariables, joinType)); @@ -442,21 +440,21 @@ public class AccumuloIndexSet extends ExternalTupleSet implements } else { // only hash join BindingSets exist - BatchScanner scanner = accCon.createBatchScanner(tablename, auths, 10); + final BatchScanner scanner = accCon.createBatchScanner(tablename, auths, 10); // only need to create hash join iterator scanner.setRanges(hashJoinRanges); - PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator( + final PCJKeyToJoinBindingSetIterator iterator = new PCJKeyToJoinBindingSetIterator( scanner, getTableVarMap(), maxPrefixLen); return new BindingSetHashJoinIterator(bindingSetHashMap, iterator, unAssuredVariables, joinType); } - } catch (Exception e) { + } catch (final Exception e) { throw new QueryEvaluationException(e); } } - private String getHashJoinKey(String commonVarOrder, BindingSet bs) { - String[] commonVarArray = commonVarOrder.split(VAR_ORDER_DELIM); + private String getHashJoinKey(final String commonVarOrder, final BindingSet bs) { + final String[] commonVarArray = commonVarOrder.split(VAR_ORDER_DELIM); String key = bs.getValue(commonVarArray[0]).toString(); for (int i = 1; i < commonVarArray.length; i++) { key = key + VALUE_DELIM + bs.getValue(commonVarArray[i]).toString(); @@ -464,9 +462,9 @@ public class AccumuloIndexSet extends ExternalTupleSet implements return key; } - private Range getRange(String commonVarOrder, BindingSet bs) + private Range getRange(final String commonVarOrder, final BindingSet bs) throws BindingSetConversionException { - AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); byte[] rangePrefix = new byte[0]; rangePrefix = converter.convert(bs, new VariableOrder(commonVarOrder)); return Range.prefix(new Text(rangePrefix)); @@ -483,10 +481,10 @@ public class AccumuloIndexSet extends ExternalTupleSet implements * and constantBindingNames */ private BindingSetVariableOrder getVarOrder( - Set<String> variableBindingNames, Set<String> constantBindingNames) { - Map<String, Set<String>> varOrderMap = this + final Set<String> variableBindingNames, final Set<String> constantBindingNames) { + final Map<String, Set<String>> varOrderMap = this .getSupportedVariableOrders(); - Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet(); + final Set<Map.Entry<String, Set<String>>> entries = varOrderMap.entrySet(); Set<String> variables; if (variableBindingNames.size() == 0 @@ -505,8 +503,8 @@ public class AccumuloIndexSet extends ExternalTupleSet implements int maxPrefixLen = 0; Set<String> minUnusedVariables = null; - for (Map.Entry<String, Set<String>> e : entries) { - Set<String> value = e.getValue(); + for (final Map.Entry<String, Set<String>> e : entries) { + final Set<String> value = e.getValue(); if (maxPrefixLen < value.size() && variables.containsAll(value) && Sets.intersection(value, variableBindingNames) @@ -527,8 +525,8 @@ public class AccumuloIndexSet extends ExternalTupleSet implements int maxPrefixLen = 0; Set<String> minUnusedVariables = null; - for (Map.Entry<String, Set<String>> e : entries) { - Set<String> value = e.getValue(); + for (final Map.Entry<String, Set<String>> e : entries) { + final Set<String> value = e.getValue(); if (maxPrefixLen < value.size() && variables.containsAll(value)) { maxPrefixLen = value.size(); maxPrefix = e.getKey(); @@ -548,11 +546,11 @@ public class AccumuloIndexSet extends ExternalTupleSet implements * constant, but are non-constant in Accumulo table */ private BindingSet getConstantConstraints() { - Map<String, String> tableMap = this.getTableVarMap(); - Set<String> keys = tableMap.keySet(); + final Map<String, String> tableMap = this.getTableVarMap(); + final Set<String> keys = tableMap.keySet(); - QueryBindingSet constants = new QueryBindingSet(); - for (String s : keys) { + final QueryBindingSet constants = new QueryBindingSet(); + for (final String s : keys) { if (s.startsWith("-const-")) { constants.addBinding(new BindingImpl(s, getConstantValueMap() .get(s))); @@ -572,7 +570,7 @@ public class AccumuloIndexSet extends ExternalTupleSet implements //if converted partial order is a prefix, convert corresponding full PCJ var order to query vars private String prefixToOrder(String order) { final Map<String, String> invMap = HashBiMap.create(this.getTableVarMap()).inverse(); - String[] temp = order.split(VAR_ORDER_DELIM); + final String[] temp = order.split(VAR_ORDER_DELIM); //get order in terms of PCJ variables for (int i = 0; i < temp.length; i++) { temp[i] = this.getTableVarMap().get(temp[i]); @@ -594,8 +592,8 @@ public class AccumuloIndexSet extends ExternalTupleSet implements int varOrderLen = 0; String varOrder; - public BindingSetVariableOrder(String varOrder, int len, - Set<String> unused) { + public BindingSetVariableOrder(final String varOrder, final int len, + final Set<String> unused) { this.varOrder = varOrder; this.varOrderLen = len; this.unusedVars = unused; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java index 3ace3c7..046bd53 100644 --- a/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java +++ b/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java @@ -26,17 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator; -import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; -import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector; -import mvm.rya.indexing.IndexPlanValidator.TupleReArranger; -import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.external.accumulo.AccumuloPcjStorage; -import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; @@ -46,7 +35,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.MalformedQueryException; @@ -64,6 +55,19 @@ import org.openrdf.sail.SailException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.indexing.IndexPlanValidator.IndexPlanValidator; +import mvm.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; +import mvm.rya.indexing.IndexPlanValidator.ThreshholdPlanSelector; +import mvm.rya.indexing.IndexPlanValidator.TupleReArranger; +import mvm.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + /** * {@link QueryOptimizer} which matches {@link TupleExpr}s associated with * pre-computed queries to sub-queries of a given query. Each matched sub-query @@ -88,7 +92,7 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { public PCJOptimizer() { } - public PCJOptimizer(Configuration conf) { + public PCJOptimizer(final Configuration conf) { this.conf = conf; try { indexSet = PCJOptimizerUtilities.getValidPCJs(getAccIndices(conf)); // TODO @@ -105,19 +109,18 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { init = true; } - public PCJOptimizer(List<ExternalTupleSet> indices, boolean useOptimalPcj) { + public PCJOptimizer(final List<ExternalTupleSet> indices, final boolean useOptimalPcj) { this.indexSet = PCJOptimizerUtilities.getValidPCJs(indices); conf = new Configuration(); conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, useOptimalPcj); } @Override - public void setConf(Configuration conf) { + public void setConf(final Configuration conf) { this.conf = conf; if (!init) { try { - indexSet = PCJOptimizerUtilities - .getValidPCJs(getAccIndices(conf)); + indexSet = PCJOptimizerUtilities.getValidPCJs(getAccIndices(conf)); } catch (MalformedQueryException | SailException | QueryEvaluationException | TableNotFoundException | AccumuloException | AccumuloSecurityException @@ -141,17 +144,17 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { * - the query to be optimized */ @Override - public void optimize(TupleExpr tupleExpr, Dataset dataset, - BindingSet bindings) { + public void optimize(TupleExpr tupleExpr, final Dataset dataset, + final BindingSet bindings) { - Projection projection = PCJOptimizerUtilities.getProjection(tupleExpr); + final Projection projection = PCJOptimizerUtilities.getProjection(tupleExpr); if (projection == null) { log.debug("TupleExpr has no Projection. Invalid TupleExpr."); return; } - IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( + final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( tupleExpr, indexSet); - List<ExternalTupleSet> pcjs = iep.getNormalizedIndices(); + final List<ExternalTupleSet> pcjs = iep.getNormalizedIndices(); // first standardize query by pulling all filters to top of query if // they exist // using TopOfQueryFilterRelocator @@ -160,9 +163,9 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { if (ConfigUtils.getUseOptimalPCJ(conf) && pcjs.size() > 0) { // get potential relevant index combinations - ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator( + final ValidIndexCombinationGenerator vic = new ValidIndexCombinationGenerator( tupleExpr); - Iterator<List<ExternalTupleSet>> iter = vic + final Iterator<List<ExternalTupleSet>> iter = vic .getValidIndexCombos(pcjs); TupleExpr bestTup = null; TupleExpr tempTup = null; @@ -171,14 +174,14 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { while (iter.hasNext()) { // apply join visitor to place external index nodes in query - TupleExpr clone = tupleExpr.clone(); + final TupleExpr clone = tupleExpr.clone(); QuerySegmentPCJMatchVisitor.matchPCJs(clone, iter.next()); // get all valid execution plans for given external index // combination by considering all // permutations of nodes in TupleExpr - IndexPlanValidator ipv = new IndexPlanValidator(false); - Iterator<TupleExpr> validTups = ipv + final IndexPlanValidator ipv = new IndexPlanValidator(false); + final Iterator<TupleExpr> validTups = ipv .getValidTuples(TupleReArranger.getTupleReOrderings( clone).iterator()); @@ -187,7 +190,7 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { // for number of external index nodes, common variables among // joins in execution plan, and number of // external products in execution plan - ThreshholdPlanSelector tps = new ThreshholdPlanSelector( + final ThreshholdPlanSelector tps = new ThreshholdPlanSelector( tupleExpr); tempTup = tps.getThreshholdQueryPlan(validTups, .4, .5, .2, .3); @@ -200,7 +203,7 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { } } if (bestTup != null) { - Projection bestTupProject = PCJOptimizerUtilities + final Projection bestTupProject = PCJOptimizerUtilities .getProjection(bestTup); projection.setArg(bestTupProject.getArg()); } @@ -232,56 +235,56 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { private QuerySegmentPCJMatchVisitor() { }; - public static void matchPCJs(TupleExpr te, - List<ExternalTupleSet> indexSet) { + public static void matchPCJs(final TupleExpr te, + final List<ExternalTupleSet> indexSet) { pcjs = indexSet; te.visit(INSTANCE); } @Override - public void meet(Join node) { - PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); - for (ExternalTupleSet pcj : pcjs) { + public void meet(final Join node) { + final PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); + for (final ExternalTupleSet pcj : pcjs) { matcher.matchPCJ(pcj); } node.replaceWith(matcher.getQuery()); - Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); + final Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); - for (TupleExpr tupleExpr : unmatched) { + for (final TupleExpr tupleExpr : unmatched) { tupleExpr.visit(this); } } @Override - public void meet(LeftJoin node) { - PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); - for (ExternalTupleSet pcj : pcjs) { + public void meet(final LeftJoin node) { + final PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); + for (final ExternalTupleSet pcj : pcjs) { matcher.matchPCJ(pcj); } node.replaceWith(matcher.getQuery()); - Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); + final Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); - for (TupleExpr tupleExpr : unmatched) { + for (final TupleExpr tupleExpr : unmatched) { tupleExpr.visit(this); } } @Override - public void meet(Filter node) { - PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); - for (ExternalTupleSet pcj : pcjs) { + public void meet(final Filter node) { + final PCJMatcher matcher = PCJMatcherFactory.getPCJMatcher(node); + for (final ExternalTupleSet pcj : pcjs) { matcher.matchPCJ(pcj); } node.replaceWith(matcher.getQuery()); - Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); + final Set<TupleExpr> unmatched = matcher.getUnmatchedArgs(); PCJOptimizerUtilities.relocateFilters(matcher.getFilters()); - for (TupleExpr tupleExpr : unmatched) { + for (final TupleExpr tupleExpr : unmatched) { tupleExpr.visit(this); } } @@ -305,40 +308,44 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { * @throws AccumuloSecurityException * @throws PcjException */ - private static List<ExternalTupleSet> getAccIndices(Configuration conf) + private static List<ExternalTupleSet> getAccIndices(final Configuration conf) throws MalformedQueryException, SailException, QueryEvaluationException, TableNotFoundException, AccumuloException, AccumuloSecurityException, PcjException { requireNonNull(conf); - String tablePrefix = requireNonNull(conf - .get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); - Connector conn = requireNonNull(ConfigUtils.getConnector(conf)); + final String tablePrefix = requireNonNull(conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)); + final Connector conn = requireNonNull(ConfigUtils.getConnector(conf)); List<String> tables = null; if (conf instanceof RdfCloudTripleStoreConfiguration) { tables = ((RdfCloudTripleStoreConfiguration) conf).getPcjTables(); } // this maps associates pcj table name with pcj sparql query - Map<String, String> indexTables = Maps.newLinkedHashMap(); - PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix); - PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); + final Map<String, String> indexTables = Maps.newLinkedHashMap(); + final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix); + final PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); - boolean tablesProvided = tables != null && !tables.isEmpty(); + final boolean tablesProvided = tables != null && !tables.isEmpty(); if (tablesProvided) { //if tables provided, associate table name with sparql for (final String table : tables) { indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql()); } + } else if(hasRyaDetails(tablePrefix, conn)) { + // If this is a newer install of Rya, and it has PCJ Details, then use those. + final List<String> ids = storage.listPcjs(); + for(final String id: ids) { + indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql()); + } } else { - //if no tables are provided by user, get ids for rya instance id, create table name, - //and associate table name with sparql - // TODO: storage.listPcjs() returns tablenames, not PCJ-IDs. - // See mvm.rya.indexing.external.accumulo.AccumuloPcjStorage.dropPcj(String) - List<String> ids = storage.listPcjs(); - for(String id: ids) { - indexTables.put(id, storage.getPcjMetadata(id).getSparql()); + // Otherwise figure it out by scanning tables. + final PcjTables pcjTables = new PcjTables(); + for(final String table : conn.tableOperations().list()) { + if(table.startsWith(tablePrefix + "INDEX")) { + indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql()); + } } } @@ -355,4 +362,13 @@ public class PCJOptimizer implements QueryOptimizer, Configurable { return index; } + private static boolean hasRyaDetails(final String ryaInstanceName, final Connector conn) { + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(conn, ryaInstanceName); + try { + detailsRepo.getRyaInstanceDetails(); + return true; + } catch(final RyaDetailsRepositoryException e) { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailConfig.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailConfig.java b/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailConfig.java index e7b7960..d66fdd7 100644 --- a/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailConfig.java +++ b/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailConfig.java @@ -1,5 +1,16 @@ package mvm.rya.sail.config; +import org.openrdf.model.Graph; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.util.GraphUtil; +import org.openrdf.model.util.GraphUtilException; +import org.openrdf.sail.config.SailConfigException; +import org.openrdf.sail.config.SailImplConfigBase; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +19,9 @@ package mvm.rya.sail.config; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,17 +32,10 @@ package mvm.rya.sail.config; import mvm.rya.accumulo.AccumuloRdfConfiguration; -import org.openrdf.model.Graph; -import org.openrdf.model.Literal; -import org.openrdf.model.Resource; -import org.openrdf.model.URI; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.util.GraphUtil; -import org.openrdf.model.util.GraphUtilException; -import org.openrdf.sail.config.SailConfigException; -import org.openrdf.sail.config.SailImplConfigBase; - +/** + * @deprecated Use {@link AccumuloRdfConfiguration} instead. + */ +@Deprecated public class RyaAccumuloSailConfig extends SailImplConfigBase { public static final String NAMESPACE = "http://rya.apache.org/RyaAccumuloSail/Config#"; @@ -43,7 +47,7 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { public static final URI IS_MOCK; static { - ValueFactory factory = ValueFactoryImpl.getInstance(); + final ValueFactory factory = ValueFactoryImpl.getInstance(); USER = factory.createURI(NAMESPACE, "user"); PASSWORD = factory.createURI(NAMESPACE, "password"); INSTANCE = factory.createURI(NAMESPACE, "instance"); @@ -56,7 +60,7 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { private String instance = "dev"; private String zookeepers = "zoo1,zoo2,zoo3"; private boolean isMock = false; - + public RyaAccumuloSailConfig() { super(RyaAccumuloSailFactory.SAIL_TYPE); } @@ -65,7 +69,7 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { return user; } - public void setUser(String user) { + public void setUser(final String user) { this.user = user; } @@ -73,7 +77,7 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { return password; } - public void setPassword(String password) { + public void setPassword(final String password) { this.password = password; } @@ -81,7 +85,7 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { return instance; } - public void setInstance(String instance) { + public void setInstance(final String instance) { this.instance = instance; } @@ -89,7 +93,7 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { return zookeepers; } - public void setZookeepers(String zookeepers) { + public void setZookeepers(final String zookeepers) { this.zookeepers = zookeepers; } @@ -97,12 +101,12 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { return isMock; } - public void setMock(boolean isMock) { + public void setMock(final boolean isMock) { this.isMock = isMock; } public AccumuloRdfConfiguration toRdfConfiguation() { - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); return conf; } @@ -112,10 +116,11 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { } @Override - public Resource export(Graph graph) { - Resource implNode = super.export(graph); + public Resource export(final Graph graph) { + final Resource implNode = super.export(graph); @SuppressWarnings("deprecation") + final ValueFactory v = graph.getValueFactory(); graph.add(implNode, USER, v.createLiteral(user)); @@ -128,32 +133,32 @@ public class RyaAccumuloSailConfig extends SailImplConfigBase { } @Override - public void parse(Graph graph, Resource implNode) throws SailConfigException { + public void parse(final Graph graph, final Resource implNode) throws SailConfigException { super.parse(graph, implNode); System.out.println("parsing"); try { - Literal userLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, USER); + final Literal userLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, USER); if (userLit != null) { setUser(userLit.getLabel()); } - Literal pwdLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, PASSWORD); + final Literal pwdLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, PASSWORD); if (pwdLit != null) { setPassword(pwdLit.getLabel()); } - Literal instLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, INSTANCE); + final Literal instLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, INSTANCE); if (instLit != null) { setInstance(instLit.getLabel()); } - Literal zooLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, ZOOKEEPERS); + final Literal zooLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, ZOOKEEPERS); if (zooLit != null) { setZookeepers(zooLit.getLabel()); } - Literal mockLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, IS_MOCK); + final Literal mockLit = GraphUtil.getOptionalObjectLiteral(graph, implNode, IS_MOCK); if (mockLit != null) { setMock(Boolean.parseBoolean(mockLit.getLabel())); } - } catch (GraphUtilException e) { + } catch (final GraphUtilException e) { throw new SailConfigException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailFactory.java b/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailFactory.java index c61eb68..83641bd 100644 --- a/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailFactory.java +++ b/extras/indexing/src/main/java/mvm/rya/sail/config/RyaAccumuloSailFactory.java @@ -1,5 +1,17 @@ package mvm.rya.sail.config; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.openrdf.sail.Sail; +import org.openrdf.sail.config.SailConfigException; +import org.openrdf.sail.config.SailFactory; +import org.openrdf.sail.config.SailImplConfig; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +20,9 @@ package mvm.rya.sail.config; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,18 +37,10 @@ import mvm.rya.api.persist.RyaDAOException; import mvm.rya.indexing.accumulo.ConfigUtils; import mvm.rya.rdftriplestore.RdfCloudTripleStore; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.openrdf.sail.Sail; -import org.openrdf.sail.config.SailConfigException; -import org.openrdf.sail.config.SailFactory; -import org.openrdf.sail.config.SailImplConfig; - +/** + * @deprecated Use {@link RyaSailFactory} instead. + */ +@Deprecated public class RyaAccumuloSailFactory implements SailFactory { public static final String SAIL_TYPE = "rya:RyaAccumuloSail"; @@ -47,13 +51,13 @@ public class RyaAccumuloSailFactory implements SailFactory { } @Override - public Sail getSail(SailImplConfig config) throws SailConfigException { + public Sail getSail(final SailImplConfig config) throws SailConfigException { try { - RdfCloudTripleStore store = new RdfCloudTripleStore(); - RyaAccumuloSailConfig cbconfig = (RyaAccumuloSailConfig) config; + final RdfCloudTripleStore store = new RdfCloudTripleStore(); + final RyaAccumuloSailConfig cbconfig = (RyaAccumuloSailConfig) config; - String instanceName = cbconfig.getInstance(); - String zooKeepers = cbconfig.getZookeepers(); + final String instanceName = cbconfig.getInstance(); + final String zooKeepers = cbconfig.getZookeepers(); Instance i; if (cbconfig.isMock()) { @@ -62,14 +66,14 @@ public class RyaAccumuloSailFactory implements SailFactory { i = new ZooKeeperInstance(instanceName, zooKeepers); } - String user = cbconfig.getUser(); - String pass = cbconfig.getPassword(); + final String user = cbconfig.getUser(); + final String pass = cbconfig.getPassword(); - Connector connector = i.getConnector(user, new PasswordToken(pass)); - AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + final Connector connector = i.getConnector(user, new PasswordToken(pass)); + final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); crdfdao.setConnector(connector); - AccumuloRdfConfiguration conf = cbconfig.toRdfConfiguation(); + final AccumuloRdfConfiguration conf = cbconfig.toRdfConfiguation(); ConfigUtils.setIndexers(conf); conf.setDisplayQueryPlan(true); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/sail/config/RyaSailFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/mvm/rya/sail/config/RyaSailFactory.java index 75daec3..44c5ee7 100644 --- a/extras/indexing/src/main/java/mvm/rya/sail/config/RyaSailFactory.java +++ b/extras/indexing/src/main/java/mvm/rya/sail/config/RyaSailFactory.java @@ -1,5 +1,3 @@ -package mvm.rya.sail.config; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -9,7 +7,7 @@ package mvm.rya.sail.config; * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,6 +16,9 @@ package mvm.rya.sail.config; * specific language governing permissions and limitations * under the License. */ +package mvm.rya.sail.config; + +import static java.util.Objects.requireNonNull; import java.net.UnknownHostException; import java.util.Objects; @@ -29,6 +30,7 @@ import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.hadoop.conf.Configuration; import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,12 +58,20 @@ import mvm.rya.rdftriplestore.inference.InferenceEngineException; public class RyaSailFactory { private static final Logger LOG = LoggerFactory.getLogger(RyaSailFactory.class); + /** + * Creates an instance of {@link Sail} that is attached to a Rya instance. + * + * @param conf - Configures how the Sail object will be constructed. (not null) + * @return A {@link Sail} object that is backed by a Rya datastore. + * @throws SailException The object could not be created. + */ public static Sail getInstance(final Configuration conf) throws AccumuloException, - AccumuloSecurityException, RyaDAOException, InferenceEngineException { + AccumuloSecurityException, RyaDAOException, InferenceEngineException, SailException { + requireNonNull(conf); return getRyaSail(conf); } - private static Sail getRyaSail(final Configuration config) throws InferenceEngineException, RyaDAOException, AccumuloException, AccumuloSecurityException { + private static Sail getRyaSail(final Configuration config) throws InferenceEngineException, RyaDAOException, AccumuloException, AccumuloSecurityException, SailException { final RdfCloudTripleStore store = new RdfCloudTripleStore(); final RyaDAO<?> dao; final RdfCloudTripleStoreConfiguration rdfConfig; @@ -101,6 +111,8 @@ public class RyaSailFactory { store.setInferenceEngine(inferenceEngine); } + store.initialize(); + return store; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJIT.java new file mode 100644 index 0000000..d2e028d --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJIT.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.List; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +import mvm.rya.api.client.CreatePCJ; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.Install.DuplicateInstanceNameException; +import mvm.rya.api.client.Install.InstallConfiguration; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; + +/** + * Integration tests the methods of {@link AccumuloCreatePCJ}. + */ +public class AccumuloCreatePCJIT extends FluoITBase { + + @Test + public void createPCJ() throws Exception { + // Initialize the commands that will be used by this test. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + super.cluster.getInstanceName(), + super.cluster.getZooKeepers()); + + final CreatePCJ createPCJ = new AccumuloCreatePCJ(connectionDetails, accumuloConn); + + // Create a PCJ. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://TacoJoint>." + + "}"; + final String pcjId = createPCJ.createPCJ(RYA_INSTANCE_NAME, sparql); + + // Verify the RyaDetails were updated to include the new PCJ. + final Optional<RyaDetails> ryaDetails = new AccumuloGetInstanceDetails(connectionDetails, accumuloConn).getDetails(RYA_INSTANCE_NAME); + final PCJDetails pcjDetails = ryaDetails.get().getPCJIndexDetails().getPCJDetails().get(pcjId); + + assertEquals(pcjId, pcjDetails.getId()); + assertFalse( pcjDetails.getLastUpdateTime().isPresent() ); + assertEquals(PCJUpdateStrategy.INCREMENTAL, pcjDetails.getUpdateStrategy().get()); + + // Verify the PCJ's metadata was initialized. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + assertEquals(sparql, pcjMetadata.getSparql()); + assertEquals(0L, pcjMetadata.getCardinality()); + + // Verify a Query ID was added for the query within the Fluo app. + final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(1, fluoQueryIds.size()); + + // Insert some statements into Rya. + final ValueFactory vf = ryaRepo.getValueFactory(); + ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + + ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")); + + ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + + // Verify the correct results were exported. + fluo.waitForObservers(); + + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("x", vf.createURI("http://Bob")); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("x", vf.createURI("http://Charlie")); + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie); + + assertEquals(expected, results); + } + + @Test(expected = InstanceDoesNotExistException.class) + public void createPCJ_instanceDoesNotExist() throws InstanceDoesNotExistException, RyaClientException { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + super.cluster.getInstanceName(), + super.cluster.getZooKeepers()); + + // Create a PCJ for a Rya instance that doesn't exist. + final CreatePCJ createPCJ = new AccumuloCreatePCJ(connectionDetails, accumuloConn); + createPCJ.createPCJ("invalidInstanceName", "SELECT * where { ?a ?b ?c }"); + } + + @Test(expected = RyaClientException.class) + public void createPCJ_invalidSparql() throws DuplicateInstanceNameException, RyaClientException { + // Install an instance of Rya. + final String instanceName = "testInstance_"; + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(true) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(true) + .setEnableGeoIndex(false) + .setFluoPcjAppName("fluo_app_name") + .build(); + + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + super.cluster.getInstanceName(), + super.cluster.getZooKeepers()); + + final Install install = new AccumuloInstall(connectionDetails, accumuloConn); + install.install(instanceName, installConfig); + + // Create a PCJ using invalid SPARQL. + final CreatePCJ createPCJ = new AccumuloCreatePCJ(connectionDetails, accumuloConn); + createPCJ.createPCJ(instanceName, "not valid sparql"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJIT.java new file mode 100644 index 0000000..1b6989a --- /dev/null +++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJIT.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryException; + +import com.google.common.collect.Sets; + +import mvm.rya.api.client.CreatePCJ; +import mvm.rya.api.client.DeletePCJ; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; + +/** + * Integration tests the methods of {@link AccumuloCreatePCJ}. + */ +public class AccumuloDeletePCJIT extends FluoITBase { + + @Test + public void deletePCJ() throws InstanceDoesNotExistException, RyaClientException, PCJStorageException, RepositoryException { + // Initialize the commands that will be used by this test. + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + super.cluster.getInstanceName(), + super.cluster.getZooKeepers()); + + final CreatePCJ createPCJ = new AccumuloCreatePCJ(connectionDetails, accumuloConn); + + // Create a PCJ. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://TacoJoint>." + + "}"; + final String pcjId = createPCJ.createPCJ(RYA_INSTANCE_NAME, sparql); + + // Verify a Query ID was added for the query within the Fluo app. + List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(1, fluoQueryIds.size()); + + // Insert some statements into Rya. + final ValueFactory vf = ryaRepo.getValueFactory(); + ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + + ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")); + + ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + + // Verify the correct results were exported. + fluo.waitForObservers(); + + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("x", vf.createURI("http://Bob")); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("x", vf.createURI("http://Charlie")); + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie); + assertEquals(expected, results); + + // Delete the PCJ. + final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn); + deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId); + + // Ensure the PCJ's metadata has been removed from the storage. + assertTrue( pcjStorage.listPcjs().isEmpty() ); + + // Ensure the PCJ has been removed from the Fluo application. + fluo.waitForObservers(); + + // Verify a Query ID was added for the query within the Fluo app. + fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(0, fluoQueryIds.size()); + } + + @Test(expected = InstanceDoesNotExistException.class) + public void deletePCJ_instanceDoesNotExist() throws InstanceDoesNotExistException, RyaClientException { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + super.cluster.getInstanceName(), + super.cluster.getZooKeepers()); + + // Delete a PCJ for a Rya instance that doesn't exist. + final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn); + deletePCJ.deletePCJ("doesNotExist", "randomID"); + } + + @Test(expected = RyaClientException.class) + public void deletePCJ_pcjDoesNotExist() throws InstanceDoesNotExistException, RyaClientException { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + ACCUMULO_USER, + ACCUMULO_PASSWORD.toCharArray(), + super.cluster.getInstanceName(), + super.cluster.getZooKeepers()); + + // Delete the PCJ. + final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn); + deletePCJ.deletePCJ(RYA_INSTANCE_NAME, "randomID"); + } +} \ No newline at end of file