[ https://issues.apache.org/jira/browse/RYA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15977546#comment-15977546 ]
ASF GitHub Bot commented on RYA-260: ------------------------------------ Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/156#discussion_r112555177 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java --- @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.DecimalLiteralImpl; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.ImmutableMap; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Updates the results of an Aggregate node when its child has added a new Binding Set to its results. + */ +@DefaultAnnotation(NonNull.class) +public class AggregationResultUpdater { + private static final Logger log = Logger.getLogger(AggregationResultUpdater.class); + + private static final AggregationStateSerDe AGG_STATE_SERDE = new ObjectSerializationAggregationStateSerDe(); + private static final VisibilityBindingSetSerde BS_SERDE = new VisibilityBindingSetSerde(); + + private static final ImmutableMap<AggregationType, AggregationFunction> FUNCTIONS; + static { + final ImmutableMap.Builder<AggregationType, AggregationFunction> builder = ImmutableMap.builder(); + builder.put(AggregationType.COUNT, new CountFunction()); + builder.put(AggregationType.SUM, new SumFunction()); + builder.put(AggregationType.AVERAGE, new AverageFunction()); + builder.put(AggregationType.MIN, new MinFunction()); + builder.put(AggregationType.MAX, new MaxFunction()); + FUNCTIONS = builder.build(); + } + + /** + * Updates the results of an Aggregation node where its child has emitted a new Binding Set. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @oaram childRow - The Row Key of the child Binding Set that changed. (not null) + * @param childCol - The Column Key of the child Binding Set that changed. (not null) + * @param aggregationMetadata - The metadata of the Aggregation node whose results will be updated. (not null) + * @throws Exception The update could not be successfulyl performed. + */ + public void updateAggregateResults( + final TransactionBase tx, + final Bytes childRow, + final Column childCol, + final AggregationMetadata aggregationMetadata) throws Exception { + requireNonNull(tx); + requireNonNull(childRow); + requireNonNull(childCol); + requireNonNull(aggregationMetadata); + + // Make sure the child binding set has not already been handled. + final Bytes childValue = tx.get(childRow, childCol); + final VisibilityBindingSet childBindingSet = BS_SERDE.deserialize(childValue); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Row Key: " + childRow + "\n" + + "Column Key: " + childCol + "\n" + + "Binding Set:\n" + childBindingSet + "\n"); + + // The Row ID for the Aggregation State that needs to be updated is defined by the Group By variables. + final String aggregationNodeId = aggregationMetadata.getNodeId(); + final VariableOrder groupByVars = aggregationMetadata.getGroupByVariableOrder(); + final Bytes rowId = RowKeyUtil.makeRowKey(aggregationNodeId, groupByVars, childBindingSet); + + // Load the old state from the bytes if one was found; otherwise initialize the state. + final Optional<Bytes> stateBytes = Optional.ofNullable( tx.get(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET) ); + + final AggregationState state; + if(stateBytes.isPresent()) { + // Deserialize the old state + final byte[] bytes = stateBytes.get().toArray(); + state = AGG_STATE_SERDE.deserialize(bytes); + } else { + // Initialize a new state. + state = new AggregationState(); + + // If we have group by bindings, their values need to be added to the state's binding set. + final MapBindingSet bindingSet = state.getBindingSet(); + for(final String variable : aggregationMetadata.getGroupByVariableOrder()) { + bindingSet.addBinding( childBindingSet.getBinding(variable) ); + } + } + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Before Update: " + state.getBindingSet().toString() + "\n"); + + // Update the visibilities of the result binding set based on the child's visibilities. + final String oldVisibility = state.getVisibility(); + final String updateVisibilities = VisibilitySimplifier.unionAndSimplify(oldVisibility, childBindingSet.getVisibility()); + state.setVisibility(updateVisibilities); + + // Update the Aggregation State with each Aggregation function included within this group. + for(final AggregationElement aggregation : aggregationMetadata.getAggregations()) { + final AggregationType type = aggregation.getAggregationType(); + final AggregationFunction function = FUNCTIONS.get(type); + if(function == null) { + throw new RuntimeException("Unrecognized aggregation function: " + type); + } + + function.update(aggregation, state, childBindingSet); + } + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "After Update:" + state.getBindingSet().toString() + "\n" ); + + // Store the updated state. This will write on top of any old state that was present for the Group By values. + tx.set(rowId, FluoQueryColumns.AGGREGATION_BINDING_SET, Bytes.of(AGG_STATE_SERDE.serialize(state))); + } + + /** + * A function that updates an {@link AggregationState}. + */ + public static interface AggregationFunction { + + /** + * Updates an {@link AggregationState} based on the values of a child Binding Set. + * + * @param aggregation - Defines which function needs to be performed as well as any details required + * to do the aggregation work. (not null) + * @param state - The state that will be updated. (not null) + * @param childBindingSet - The Binding Set whose values will be used to update the state. + */ + public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet); + } + + /** + * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name + * that is being counted by the {@link AggregationElement}. + */ + public static final class CountFunction implements AggregationFunction { + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements."); + + // Only add one to the count if the child contains the binding that we are counting. + final String aggregatedName = aggregation.getAggregatedBidingName(); + if(childBindingSet.hasBinding(aggregatedName)) { --- End diff -- Spelling on method name > Add Aggregation support for Fluo/PCJ app > ---------------------------------------- > > Key: RYA-260 > URL: https://issues.apache.org/jira/browse/RYA-260 > Project: Rya > Issue Type: New Feature > Reporter: Andrew Smith > Assignee: Kevin Chilton > > A user must be able to submit a PCJ query that contains the following > aggregation functions from SPARQL: > * Sum > * Count > * Average > * Min > * Max > This task does not include any aggregations that appear within a GroupBy > clause. We only need to support queries that have the aggregation within the > SELECT section. > For example, the following query should be processed: > {code} > SELECT (avg(?price) as ?averagePrice) > { > urn:BookA urn:price ?price. > } > {code} > And the following query should not be processed because it requires a group > by: > {code} > SELECT ?title (avg(?price) as ?averagePrice) > { > ?title urn:price ?price. > } > GROUP BY ?title > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)