Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/172#discussion_r160453185
--- Diff:
extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java
---
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.rya.api.client.BatchUpdatePCJ;
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.PCJDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import
org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import
org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import
org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerBase;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+
+/**
+ * A Mongo implementation of {@link CreatePCJ}.
+ */
+public class MongoBatchUpdatePCJ implements BatchUpdatePCJ {
+ private static final Logger log =
LoggerFactory.getLogger(MongoBatchUpdatePCJ.class);
+
+ private final MongoConnectionDetails connectionDetails;
+ private final InstanceExists instanceExists;
+ private final MongoClient mongoClient;
+
+ /**
+ * Constructs an instance of {@link MongoBatchUpdatePCJ}.
+ *
+ * @param connectionDetails - Details to connect to the server. (not
null)
+ * @param instanceExists - The interactor used to check if a Rya
instance exists. (not null)
+ * @param mongoClient - The {@link MongoClient} to use when batch
updating. (not null)
+ */
+ public MongoBatchUpdatePCJ(
+ final MongoConnectionDetails connectionDetails,
+ final MongoInstanceExists instanceExists,
+ final MongoClient mongoClient) {
+ this.connectionDetails = requireNonNull(connectionDetails);
+ this.instanceExists = requireNonNull(instanceExists);
+ this.mongoClient = requireNonNull(mongoClient);
+ }
+
+ @Override
+ public void batchUpdate(final String ryaInstanceName, final String
pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException,
RyaClientException {
+ requireNonNull(ryaInstanceName);
+ requireNonNull(pcjId);
+
+ Preconditions.checkState(instanceExists.exists(ryaInstanceName),
"The instance: " + ryaInstanceName + " does not exist.");
+
+ verifyPCJState(ryaInstanceName, pcjId, mongoClient);
+ updatePCJResults(ryaInstanceName, pcjId, mongoClient);
+ updatePCJMetadata(ryaInstanceName, pcjId, mongoClient);
+ }
+
+ private void verifyPCJState(final String ryaInstanceName, final String
pcjId, final MongoClient client) throws RyaClientException {
+ try {
+ // Fetch the Rya instance's details.
+ final RyaDetailsRepository detailsRepo = new
MongoRyaInstanceDetailsRepository(client, ryaInstanceName);
+ final RyaDetails ryaDetails =
detailsRepo.getRyaInstanceDetails();
+
+ // Ensure PCJs are enabled.
+ if(!ryaDetails.getPCJIndexDetails().isEnabled()) {
+ throw new RyaClientException("PCJs are not enabled for the
Rya instance named '" + ryaInstanceName + "'.");
+ }
+
+ // Ensure the PCJ exists.
+
if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) {
+ throw new PCJDoesNotExistException("The PCJ with id '" +
pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'.");
+ }
+
+ } catch(final NotInitializedException e) {
+ throw new InstanceDoesNotExistException("No RyaDetails are
initialized for the Rya instance named '" + ryaInstanceName + "'.", e);
+ } catch (final RyaDetailsRepositoryException e) {
+ throw new RyaClientException("Could not fetch the RyaDetails
for the Rya instance named '" + ryaInstanceName + "'.", e);
+ }
+ }
+
+ private void updatePCJResults(final String ryaInstanceName, final
String pcjId, final MongoClient client) throws InstanceDoesNotExistException,
PCJDoesNotExistException, RyaClientException {
+ // Things that have to be closed before we exit.
+ Sail sail = null;
+ SailConnection sailConn = null;
+
+ try(final PrecomputedJoinStorage pcjStorage = new
MongoPcjStorage(client, ryaInstanceName)) {
+ // Create an instance of Sail backed by the Rya instance.
+ sail = connectToRya(ryaInstanceName);
+ final SailRepository sailRepo = new SailRepository(sail);
+ final SailRepositoryConnection sailRepoConn =
sailRepo.getConnection();
+ // Purge the old results from the PCJ.
+ try {
+ pcjStorage.purge(pcjId);
+ } catch (final PCJStorageException e) {
+ throw new RyaClientException("Could not batch update PCJ
with ID '" + pcjId + "' because the old " +
+ "results could not be purged from it.", e);
+ }
+
+ // Parse the PCJ's SPARQL query.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = metadata.getSparql();
+ sailConn = sail.getConnection();
+ final TupleQuery tupleQuery =
sailRepoConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql);
+
+ // Execute the query.
+ final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
--- End diff --
Is there a BATCH_SIZE constant somewhere? Use that or create one and
replace all the 1000's with it.
---