Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r160834195 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java --- @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.mongodb.MongoClient; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.util.JSON; + +/** + * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows: + * + * <pre> + * <code> + * ----- PCJ Metadata Doc ----- + * { + * _id: [pcj_name]_METADATA, + * sparql: [sparql query to match results], + * varOrders: [varOrder1, VarOrder2, ..., VarOrdern] + * cardinality: [number of results] + * } + * + * ----- PCJ Results Doc ----- + * { + * pcjId: [pcj_name], + * visibilities: [visibilities] + * [binding_var1]: { + * uri: [type_uri], + * value: [value] + * } + * . + * . + * . + * [binding_varn]: { + * uri: [type_uri], + * value: [value] + * } + * } + * </code> + * </pre> + */ +public class MongoPcjDocuments { + public static final String PCJ_COLLECTION_NAME = "pcjs"; + + // metadata fields + public static final String CARDINALITY_FIELD = "cardinality"; + public static final String SPARQL_FIELD = "sparql"; + public static final String PCJ_ID = "_id"; + public static final String VAR_ORDER_FIELD = "varOrders"; + + // pcj results fields + private static final String BINDING_VALUE = "value"; + private static final String BINDING_TYPE = "rdfType"; + private static final String VISIBILITIES_FIELD = "visibilities"; + private static final String PCJ_NAME = "pcjId"; + + private final MongoCollection<Document> pcjCollection; + private static final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory(); + + /** + * Creates a new {@link MongoPcjDocuments}. + * @param client - The {@link MongoClient} to use to connect to mongo. + * @param ryaInstanceName - The rya instance to connect to. + */ + public MongoPcjDocuments(final MongoClient client, final String ryaInstanceName) { + requireNonNull(client); + requireNonNull(ryaInstanceName); + pcjCollection = client.getDatabase(ryaInstanceName).getCollection(PCJ_COLLECTION_NAME); + } + + private String makeMetadataID(final String pcjId) { + return pcjId + "_METADATA"; + } + + /** + * Creates a {@link Document} containing the metadata defining the PCj. + * + * @param pcjId + * - The name of the PCJ. (not null) + * @param sparql + * - The sparql query the PCJ will use. + * @return The document built around the provided metadata. + * @throws PCJStorageException + * - Thrown when the sparql query is malformed. + */ + public Document makeMetadataDocument(final String pcjId, final String sparql) throws PCJStorageException { + requireNonNull(pcjId); + requireNonNull(sparql); + + final Set<VariableOrder> varOrders; + try { + varOrders = pcjVarOrderFactory.makeVarOrders(sparql); + } catch (final MalformedQueryException e) { + throw new PCJStorageException("Can not create the PCJ. The SPARQL is malformed.", e); + } + + return new Document() + .append(PCJ_ID, makeMetadataID(pcjId)) + .append(SPARQL_FIELD, sparql) + .append(CARDINALITY_FIELD, 0) + .append(VAR_ORDER_FIELD, varOrders); + + } + + /** + * Creates a new PCJ based on the provided metadata. The initial pcj results + * will be empty. + * + * @param pcjId + * - The unique name of the PCJ. --- End diff -- done
---