[
https://issues.apache.org/jira/browse/RYA-416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313637#comment-16313637
]
ASF GitHub Bot commented on RYA-416:
------------------------------------
Github user jessehatfield commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/254#discussion_r159943074
--- Diff:
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Map;
+
+import org.bson.Document;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.mongodb.client.AggregateIterable;
+import com.mongodb.client.MongoCursor;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * An iterator that converts the documents resulting from an
+ * {@link AggregationPipelineQueryNode} into {@link BindingSet}s.
+ */
+public class PipelineResultIteration implements
CloseableIteration<BindingSet, QueryEvaluationException> {
+ private static final int BATCH_SIZE = 1000;
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+ private final MongoCursor<Document> cursor;
+ private final Map<String, String> varToOriginalName;
+ private final BindingSet bindings;
+ private BindingSet nextSolution = null;
+
+ /**
+ * Constructor.
+ * @param aggIter Iterator of documents in
AggregationPipelineQueryNode's
+ * intermediate solution representation.
+ * @param varToOriginalName A mapping from field names in the pipeline
+ * result documents to equivalent variable names in the original
query.
+ * Where an entry does not exist for a field, the field name and
variable
+ * name are assumed to be the same.
+ * @param bindings A partial solution. May be empty.
+ */
+ public PipelineResultIteration(AggregateIterable<Document> aggIter,
+ Map<String, String> varToOriginalName,
+ BindingSet bindings) {
+ aggIter.batchSize(BATCH_SIZE);
+ this.cursor = aggIter.iterator();
+ this.varToOriginalName = varToOriginalName;
+ this.bindings = bindings;
+ lookahead();
+ }
+
+ private void lookahead() {
+ while (nextSolution == null && cursor.hasNext()) {
+ nextSolution = docToBindingSet(cursor.next());
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws QueryEvaluationException {
+ lookahead();
+ return nextSolution != null;
+ }
+
+ @Override
+ public BindingSet next() throws QueryEvaluationException {
+ lookahead();
+ BindingSet solution = nextSolution;
+ nextSolution = null;
+ return solution;
+ }
+
+ @Override
+ public void remove() throws QueryEvaluationException {
+ lookahead();
+ nextSolution = null;
+ }
+
+ @Override
+ public void close() throws QueryEvaluationException {
+ cursor.close();
+ }
+
+ private QueryBindingSet docToBindingSet(Document result) {
+ QueryBindingSet bindingSet = new QueryBindingSet(bindings);
+ Document valueSet =
result.get(AggregationPipelineQueryNode.VALUES, Document.class);
+ Document typeSet = result.get(AggregationPipelineQueryNode.TYPES,
Document.class);
+ if (valueSet != null) {
+ for (Map.Entry<String, Object> entry : valueSet.entrySet()) {
+ String fieldName = entry.getKey();
+ String valueString = entry.getValue().toString();
+ String typeString = typeSet == null ? null :
typeSet.getString(fieldName);
+ String varName = varToOriginalName.getOrDefault(fieldName,
fieldName);
+ Value varValue;
+ if (typeString == null ||
typeString.equals(XMLSchema.ANYURI.stringValue())) {
+ varValue = VF.createURI(valueString);
+ }
+ else {
+ varValue = VF.createLiteral(valueString,
VF.createURI(typeString));
+ }
+ Binding existingBinding = bindingSet.getBinding(varName);
+ // If this variable is not already bound, add it.
+ if (existingBinding == null) {
+ bindingSet.addBinding(varName, varValue);
+ }
+ // If it's bound to something else, the solutions are
incompatible.
+ else if (!existingBinding.getValue().equals(varValue)) {
+ return null;
--- End diff --
This method is only called internally, by lookahead(). It returns null when
the next solution returned by the pipeline is incompatible with any bindings
that were passed into the constructor. (The constructor may have received a
non-empty BindingSet if the pipeline query is being evaluated in the context of
a specific partial solution.) The lookahead method will use this to loop
through pipeline results until it finds one that is compatible with the
bindings, or until the pipeline yields no more results. The null value should
never make its way to the caller. I didn't use an exception here because it
isn't really an exceptional case in that context: we should expect some of the
solutions to the pipeline subquery to not match the specific candidate solution.
Skipping over non-matching results client-side might be inefficient, and we
could implement this instead by adding a "$match" step for the candidate
solution onto the pipeline. But this wouldn't solve the main inefficiency which
is that either way we're executing the pipeline subquery for each candidate
solution. So the approach I've taken here is just to do something that
satisfies the API correctly, and hope that we can structure queries to avoid
the inherent inefficiency. (For example, if we had `Join(Join(SP1, SP2),
Join(SP3, SP4))`, the visitor as currently implemented would turn that into a
join of two pipeline nodes and have to pass binding sets from one into the
other, which will be slow regardless of how that's done. But if it's instead
parsed as `Join(Join(Join(SP1, SP2), SP3), SP4))` then we can translate the
whole thing into one pipeline query, which should be faster.)
I added a test for this iterator which includes the case of a provided
BindingSet and incompatible partial solutions, verifying that the expected
(non-null) results are returned.
> Add the ability use the MongoDB aggregation pipeline to evaluate simple
> SPARQL expressions
> ------------------------------------------------------------------------------------------
>
> Key: RYA-416
> URL: https://issues.apache.org/jira/browse/RYA-416
> Project: Rya
> Issue Type: New Feature
> Reporter: Jesse Hatfield
> Assignee: Jesse Hatfield
>
> MongoDB provides the [aggregation pipeline
> framework|https://docs.mongodb.com/manual/core/aggregation-pipeline/] for
> multi-stage data processing. Currently, the query engine invokes this
> framework to apply individual statement patterns (using a "$match" expression
> for each and iterating through the results), then applies higher-level query
> operations (join, filter, select, project, etc) client-side.
> In principle, those high-level query operations could be rewritten as
> aggregation pipeline stages as well ($group, $match, $project, etc). This
> would allow more query evaluation logic to be executed by the MongoDB server
> itself, enabling server-side optimization. This could be used as a general
> query optimization, but would additionally be useful for any tool that only
> needed to write query results back to the server: adding a write step to the
> end of the resulting pipeline could obviate the need to communicate
> individual results to the client at all.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)