http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index a701052..8d218af 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -18,6 +18,7 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import java.util.Collection; @@ -29,7 +30,6 @@ import org.apache.commons.lang3.builder.EqualsBuilder; import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -46,6 +46,7 @@ public class FluoQuery { private final Optional<QueryMetadata> queryMetadata; private final Optional<ConstructQueryMetadata> constructMetadata; + private final Optional<PeriodicQueryMetadata> periodicQueryMetadata; private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata; private final ImmutableMap<String, FilterMetadata> filterMetadata; private final ImmutableMap<String, JoinMetadata> joinMetadata; @@ -58,6 +59,7 @@ public class FluoQuery { * must use {@link Builder} instead. * * @param queryMetadata - The root node of a query that is updated in Fluo. (not null) + * @param periodicQueryMetadata - The periodic query node that is updated in Fluo. * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as * it is represented within the Fluo app. (not null) * @param filterMetadata - A map from Node ID to Filter metadata as it is represented @@ -69,6 +71,7 @@ public class FluoQuery { */ private FluoQuery( final QueryMetadata queryMetadata, + final Optional<PeriodicQueryMetadata> periodicQueryMetadata, final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, final ImmutableMap<String, FilterMetadata> filterMetadata, final ImmutableMap<String, JoinMetadata> joinMetadata, @@ -76,6 +79,7 @@ public class FluoQuery { this.aggregationMetadata = requireNonNull(aggregationMetadata); this.queryMetadata = Optional.of(requireNonNull(queryMetadata)); this.constructMetadata = Optional.absent(); + this.periodicQueryMetadata = periodicQueryMetadata; this.statementPatternMetadata = requireNonNull(statementPatternMetadata); this.filterMetadata = requireNonNull(filterMetadata); this.joinMetadata = requireNonNull(joinMetadata); @@ -88,23 +92,26 @@ public class FluoQuery { * must use {@link Builder} instead. * * @param constructMetadata - The root node of a query that is updated in Fluo. (not null) + * @param periodicQueryMetadata - The periodic query node that is updated in Fluo. * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as * it is represented within the Fluo app. (not null) * @param filterMetadata A map from Node ID to Filter metadata as it is represented * within the Fluo app. (not null) - * @param joinMetadata A map from Node ID to Join metadata as it is represented + * @param joinMetadata - A map from Node ID to Join metadata as it is represented * within the Fluo app. (not null) * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is * represented within the Fluo app. (not null) */ private FluoQuery( final ConstructQueryMetadata constructMetadata, + final Optional<PeriodicQueryMetadata> periodicQueryMetadata, final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata, final ImmutableMap<String, FilterMetadata> filterMetadata, final ImmutableMap<String, JoinMetadata> joinMetadata, final ImmutableMap<String, AggregationMetadata> aggregationMetadata) { this.constructMetadata = Optional.of(requireNonNull(constructMetadata)); this.queryMetadata = Optional.absent(); + this.periodicQueryMetadata = periodicQueryMetadata; this.statementPatternMetadata = requireNonNull(statementPatternMetadata); this.filterMetadata = requireNonNull(filterMetadata); this.joinMetadata = requireNonNull(joinMetadata); @@ -130,6 +137,13 @@ public class FluoQuery { public Optional<ConstructQueryMetadata> getConstructQueryMetadata() { return constructMetadata; } + + /** + * @return All of the Periodic Query metadata that is stored for the query. + */ + public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() { + return periodicQueryMetadata; + } /** * Get a Statement Pattern node's metadata. @@ -207,6 +221,7 @@ public class FluoQuery { public int hashCode() { return Objects.hashCode( queryMetadata, + periodicQueryMetadata, statementPatternMetadata, filterMetadata, joinMetadata, @@ -224,6 +239,7 @@ public class FluoQuery { return new EqualsBuilder() .append(queryMetadata, fluoQuery.queryMetadata) .append(constructMetadata, fluoQuery.constructMetadata) + .append(periodicQueryMetadata, fluoQuery.periodicQueryMetadata) .append(statementPatternMetadata, fluoQuery.statementPatternMetadata) .append(filterMetadata, fluoQuery.filterMetadata) .append(joinMetadata, fluoQuery.joinMetadata) @@ -247,6 +263,11 @@ public class FluoQuery { builder.append( constructMetadata.get().toString() ); builder.append("\n"); } + + if(periodicQueryMetadata.isPresent()) { + builder.append(periodicQueryMetadata.get()); + builder.append("\n"); + } for(final FilterMetadata metadata : filterMetadata.values()) { builder.append(metadata); @@ -286,6 +307,7 @@ public class FluoQuery { private QueryMetadata.Builder queryBuilder = null; private ConstructQueryMetadata.Builder constructBuilder = null; + private PeriodicQueryMetadata.Builder periodicQueryBuilder = null; private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>(); private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>(); private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>(); @@ -388,6 +410,17 @@ public class FluoQuery { } /** + * Get a Join builder from this builder. + * + * @param nodeId - The Node ID the Join builder was stored at. (not null) + * @return The builder that was stored at the node id if one was found. + */ + public Optional<JoinMetadata.Builder> getJoinBuilder(final String nodeId) { + requireNonNull(nodeId); + return Optional.fromNullable( joinBuilders.get(nodeId) ); + } + + /** * Get an Aggregate builder from this builder. * * @param nodeId - The Node ID the Aggregate builder was stored at. (not null) @@ -410,15 +443,28 @@ public class FluoQuery { return this; } + + /** - * Get a Join builder from this builder. + * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder. * - * @param nodeId - The Node ID the Join builder was stored at. (not null) - * @return The builder that was stored at the node id if one was found. + * @param periodicQueryBuilder - A builder representing a specific Join within the query. (not null) + * @return This builder so that method invocation may be chained. */ - public Optional<JoinMetadata.Builder> getJoinBuilder(final String nodeId) { - requireNonNull(nodeId); - return Optional.fromNullable( joinBuilders.get(nodeId) ); + public Builder addPeriodicQueryMetadata(final PeriodicQueryMetadata.Builder periodicQueryBuilder) { + requireNonNull(periodicQueryBuilder); + this.periodicQueryBuilder = periodicQueryBuilder; + return this; + } + + + /** + * Get a PeriodicQuery builder from this builder. + * + * @return The PeriodicQuery builder if one has been set. + */ + public Optional<PeriodicQueryMetadata.Builder> getPeriodicQueryBuilder() { + return Optional.fromNullable( periodicQueryBuilder); } @@ -426,8 +472,19 @@ public class FluoQuery { * @return Creates a {@link FluoQuery} using the values that have been supplied to this builder. */ public FluoQuery build() { - Preconditions.checkArgument( - (queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null)); + checkArgument((queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null)); + + Optional<QueryMetadata.Builder> optionalQueryBuilder = getQueryBuilder(); + QueryMetadata queryMetadata = null; + if(optionalQueryBuilder.isPresent()) { + queryMetadata = optionalQueryBuilder.get().build(); + } + + Optional<PeriodicQueryMetadata.Builder> optionalPeriodicQueryBuilder = getPeriodicQueryBuilder(); + PeriodicQueryMetadata periodicQueryMetadata = null; + if(optionalPeriodicQueryBuilder.isPresent()) { + periodicQueryMetadata = optionalPeriodicQueryBuilder.get().build(); + } final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder(); for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) { @@ -450,11 +507,11 @@ public class FluoQuery { } if(queryBuilder != null) { - return new FluoQuery(queryBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + return new FluoQuery(queryBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); } //constructBuilder non-null in this case, but no need to check else { - return new FluoQuery(constructBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); + return new FluoQuery(constructBuilder.build(), Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build()); } }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 3396114..ed18d49 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -63,14 +63,28 @@ import edu.umd.cs.findbugs.annotations.NonNull; * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:nodeId</td> <td>The Node ID of the Filter.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:veriableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> - * <tr> <td>Node ID</td> <td>filterMetadata:originalSparql</td> <td>The original SPRAQL query this filter was derived from.</td> </tr> - * <tr> <td>Node ID</td> <td>filterMetadata:filterIndexWithinSparql</td> <td>Indicates which filter within the original SPARQL query this represents.</td> </tr> + * <tr> <td>Node ID</td> <td>filterMetadata:filterSparql</td> <td>A SPARQL query representing this filter.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:parentNodeId</td> <td>The Node ID this filter emits Binding Sets to.</td> </tr> * <tr> <td>Node ID</td> <td>filterMetadata:childNodeId</td> <td>The Node ID of the node that feeds this node Binding Sets.</td> </tr> * <tr> <td>Node ID + DELIM + Binding Set String</td> <td>filterMetadata:bindingSet</td> <td>A {@link VisibilityBindingSet} object.</td> </tr> * </table> * </p> * <p> + * <b>Periodic Bin Metadata</b> + * <table border="1" style="width:100%"> + * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:nodeId</td> <td>The Node ID of the Filter.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:variableOrder</td> <td>The Variable Order binding sets are emitted with.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:period</td> <td>The period size used to form BindingSet bins.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:windowSize</td> <td>The window size used to form BindingSet bins.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:timeUnit</td> <td>The unit of time corresponding to period and window size.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:temporalVariable</td> <td>The BindingSet variable corresponding to event time.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:parentNodeId</td> <td>The parent node for this node.</td> </tr> + * <tr> <td>Node ID</td> <td>periodicQueryMetadata:childNodeId</td> <td>The child node for this node.</td> </tr> + * <tr> <td>Node ID + DELIM + Binding set String</td> <td>periodicQueryMetadata:bindingSet</td> <td>A binned BindingSet.</td> </tr> + * </table> + * </p> + * <p> * <b>Join Metadata</b> * <table border="1" style="width:100%"> * <tr> <th>Fluo Row</td> <th>Fluo Column</td> <th>Fluo Value</td> </tr> @@ -117,6 +131,7 @@ public class FluoQueryColumns { public static final String STATEMENT_PATTERN_METADATA_CF = "statementPatternMetadata"; public static final String AGGREGATION_METADATA_CF = "aggregationMetadata"; public static final String CONSTRUCT_METADATA_CF = "constructMetadata"; + public static final String PERIODIC_QUERY_METADATA_CF = "periodicQueryMetadata"; /** * New triples that have been added to Rya are written as a row in this @@ -174,13 +189,23 @@ public class FluoQueryColumns { // Filter Metadata columns. public static final Column FILTER_NODE_ID = new Column(FILTER_METADATA_CF, "nodeId"); - public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "veriableOrder"); - public static final Column FILTER_ORIGINAL_SPARQL = new Column(FILTER_METADATA_CF, "originalSparql"); - public static final Column FILTER_INDEX_WITHIN_SPARQL = new Column(FILTER_METADATA_CF, "filterIndexWithinSparql"); + public static final Column FILTER_VARIABLE_ORDER = new Column(FILTER_METADATA_CF, "variableOrder"); + public static final Column FILTER_SPARQL = new Column(FILTER_METADATA_CF, "filterSparql"); public static final Column FILTER_PARENT_NODE_ID = new Column(FILTER_METADATA_CF, "parentNodeId"); public static final Column FILTER_CHILD_NODE_ID = new Column(FILTER_METADATA_CF, "childNodeId"); public static final Column FILTER_BINDING_SET = new Column(FILTER_METADATA_CF, "bindingSet"); - + + // Periodic Bin Metadata columns. + public static final Column PERIODIC_QUERY_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "nodeId"); + public static final Column PERIODIC_QUERY_VARIABLE_ORDER = new Column(PERIODIC_QUERY_METADATA_CF, "variableOrder"); + public static final Column PERIODIC_QUERY_PARENT_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "parentNodeId"); + public static final Column PERIODIC_QUERY_CHILD_NODE_ID = new Column(PERIODIC_QUERY_METADATA_CF, "childNodeId"); + public static final Column PERIODIC_QUERY_BINDING_SET = new Column(PERIODIC_QUERY_METADATA_CF, "bindingSet"); + public static final Column PERIODIC_QUERY_PERIOD = new Column(PERIODIC_QUERY_METADATA_CF, "period"); + public static final Column PERIODIC_QUERY_WINDOWSIZE = new Column(PERIODIC_QUERY_METADATA_CF, "windowSize"); + public static final Column PERIODIC_QUERY_TIMEUNIT = new Column(PERIODIC_QUERY_METADATA_CF, "timeUnit"); + public static final Column PERIODIC_QUERY_TEMPORAL_VARIABLE = new Column(PERIODIC_QUERY_METADATA_CF, "temporalVariable"); + // Join Metadata columns. public static final Column JOIN_NODE_ID = new Column(JOIN_METADATA_CF, "nodeId"); public static final Column JOIN_VARIABLE_ORDER = new Column(JOIN_METADATA_CF, "variableOrder"); @@ -207,6 +232,18 @@ public class FluoQueryColumns { public static final Column AGGREGATION_BINDING_SET = new Column(AGGREGATION_METADATA_CF, "bindingSet"); /** + * BatchObserver column for processing tasks that need to be broken into + * batches. Entries stored stored in this column are of the form Row: + * nodeId, Value: BatchInformation. The nodeId indicates the node that the + * batch operation will be performed on. All batch operations are performed + * on the bindingSet column for the NodeType indicated by the given nodeId. + * For example, if the nodeId indicated that the NodeType was + * StatementPattern, then the batch operation would be performed on + * {@link FluoQueryColumns#STATEMENT_PATTERN_BINDING_SET}. + */ + public static final Column BATCH_COLUMN = new Column("batch","information"); + + /** * Enumerates the {@link Column}s that hold all of the fields for each type * of node that can compose a query. */ @@ -220,6 +257,20 @@ public class FluoQueryColumns { QUERY_VARIABLE_ORDER, QUERY_SPARQL, QUERY_CHILD_NODE_ID)), + + + /** + * The columns a {@link PeriodicBinMetadata} object's fields are stored within. + */ + PERIODIC_QUERY_COLUMNS( + Arrays.asList(PERIODIC_QUERY_NODE_ID, + PERIODIC_QUERY_VARIABLE_ORDER, + PERIODIC_QUERY_PERIOD, + PERIODIC_QUERY_WINDOWSIZE, + PERIODIC_QUERY_TIMEUNIT, + PERIODIC_QUERY_TEMPORAL_VARIABLE, + PERIODIC_QUERY_PARENT_NODE_ID, + PERIODIC_QUERY_CHILD_NODE_ID)), /** * The columns a {@link ConstructQueryMetadata} object's fields are stored within. @@ -239,8 +290,7 @@ public class FluoQueryColumns { FILTER_COLUMNS( Arrays.asList(FILTER_NODE_ID, FILTER_VARIABLE_ORDER, - FILTER_ORIGINAL_SPARQL, - FILTER_INDEX_WITHIN_SPARQL, + FILTER_SPARQL, FILTER_PARENT_NODE_ID, FILTER_CHILD_NODE_ID)), http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index 5e9d654..8675b80 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -26,6 +26,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.fluo.api.client.SnapshotBase; import org.apache.fluo.api.client.TransactionBase; @@ -40,6 +41,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -170,8 +172,7 @@ public class FluoQueryMetadataDAO { final String rowId = metadata.getNodeId(); tx.set(rowId, FluoQueryColumns.FILTER_NODE_ID, rowId); tx.set(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, metadata.getVariableOrder().toString()); - tx.set(rowId, FluoQueryColumns.FILTER_ORIGINAL_SPARQL, metadata.getOriginalSparql() ); - tx.set(rowId, FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, metadata.getFilterIndexWithinSparql()+"" ); + tx.set(rowId, FluoQueryColumns.FILTER_SPARQL, metadata.getFilterSparql() ); tx.set(rowId, FluoQueryColumns.FILTER_PARENT_NODE_ID, metadata.getParentNodeId() ); tx.set(rowId, FluoQueryColumns.FILTER_CHILD_NODE_ID, metadata.getChildNodeId() ); } @@ -195,8 +196,7 @@ public class FluoQueryMetadataDAO { final String rowId = nodeId; final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.FILTER_VARIABLE_ORDER, - FluoQueryColumns.FILTER_ORIGINAL_SPARQL, - FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL, + FluoQueryColumns.FILTER_SPARQL, FluoQueryColumns.FILTER_PARENT_NODE_ID, FluoQueryColumns.FILTER_CHILD_NODE_ID); @@ -204,18 +204,88 @@ public class FluoQueryMetadataDAO { final String varOrderString = values.get(FluoQueryColumns.FILTER_VARIABLE_ORDER); final VariableOrder varOrder = new VariableOrder(varOrderString); - final String originalSparql = values.get(FluoQueryColumns.FILTER_ORIGINAL_SPARQL); - final int filterIndexWithinSparql = Integer.parseInt(values.get(FluoQueryColumns.FILTER_INDEX_WITHIN_SPARQL)); + final String originalSparql = values.get(FluoQueryColumns.FILTER_SPARQL); final String parentNodeId = values.get(FluoQueryColumns.FILTER_PARENT_NODE_ID); final String childNodeId = values.get(FluoQueryColumns.FILTER_CHILD_NODE_ID); - return FilterMetadata.builder(nodeId) + return FilterMetadata.builder(nodeId).setVarOrder(varOrder).setFilterSparql(originalSparql) + .setParentNodeId(parentNodeId).setChildNodeId(childNodeId); + } + + /** + * Write an instance of {@link PeriodicQueryMetadata} to the Fluo table. + * + * @param tx + * - The transaction that will be used to commit the metadata. + * (not null) + * @param metadata + * - The PeriodicBin node metadata that will be written to the + * table. (not null) + */ + public void write(final TransactionBase tx, final PeriodicQueryMetadata metadata) { + requireNonNull(tx); + requireNonNull(metadata); + + final String rowId = metadata.getNodeId(); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_NODE_ID, rowId); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER, metadata.getVariableOrder().toString()); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID, metadata.getParentNodeId()); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID, metadata.getChildNodeId()); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_PERIOD, Long.toString(metadata.getPeriod())); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE, Long.toString(metadata.getWindowSize())); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT, metadata.getUnit().name()); + tx.set(rowId, FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE, metadata.getTemporalVariable()); + } + + /** + * Read an instance of {@link PeriodicQueryMetadata} from the Fluo table. + * + * @param sx + * - The snapshot that will be used to read the metadata. (not + * null) + * @param nodeId + * - The nodeId of the PeriodicBin node that will be read. (not + * null) + * @return The {@link PeriodicQueryMetadata} that was read from table. + */ + public PeriodicQueryMetadata readPeriodicQueryMetadata(final SnapshotBase sx, final String nodeId) { + return readPeriodicQueryMetadataBuilder(sx, nodeId).build(); + } + + private PeriodicQueryMetadata.Builder readPeriodicQueryMetadataBuilder(final SnapshotBase sx, final String nodeId) { + requireNonNull(sx); + requireNonNull(nodeId); + + // Fetch the values from the Fluo table. + final String rowId = nodeId; + final Map<Column, String> values = sx.gets(rowId, FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER, + FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID, FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID, + FluoQueryColumns.PERIODIC_QUERY_PERIOD, FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE, + FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT, FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE); + + // Return an object holding them. + final String varOrderString = values.get(FluoQueryColumns.PERIODIC_QUERY_VARIABLE_ORDER); + final VariableOrder varOrder = new VariableOrder(varOrderString); + final String parentNodeId = values.get(FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID); + final String childNodeId = values.get(FluoQueryColumns.PERIODIC_QUERY_CHILD_NODE_ID); + final String temporalVariable = values.get(FluoQueryColumns.PERIODIC_QUERY_TEMPORAL_VARIABLE); + final String period = values.get(FluoQueryColumns.PERIODIC_QUERY_PERIOD); + final String window = values.get(FluoQueryColumns.PERIODIC_QUERY_WINDOWSIZE); + final String timeUnit = values.get(FluoQueryColumns.PERIODIC_QUERY_TIMEUNIT); + + return PeriodicQueryMetadata.builder() + .setNodeId(nodeId) .setVarOrder(varOrder) - .setOriginalSparql(originalSparql) - .setFilterIndexWithinSparql(filterIndexWithinSparql) .setParentNodeId(parentNodeId) - .setChildNodeId(childNodeId); + .setChildNodeId(childNodeId) + .setWindowSize(Long.parseLong(window)) + .setPeriod(Long.parseLong(period)) + .setTemporalVariable(temporalVariable) + .setUnit(TimeUnit.valueOf(timeUnit)); + } + + /** * Write an instance of {@link JoinMetadata} to the Fluo table. @@ -325,12 +395,10 @@ public class FluoQueryMetadataDAO { final String pattern = values.get(FluoQueryColumns.STATEMENT_PATTERN_PATTERN); final String parentNodeId = values.get(FluoQueryColumns.STATEMENT_PATTERN_PARENT_NODE_ID); - return StatementPatternMetadata.builder(nodeId) - .setVarOrder(varOrder) - .setStatementPattern(pattern) - .setParentNodeId(parentNodeId); + return StatementPatternMetadata.builder(nodeId).setVarOrder(varOrder).setStatementPattern(pattern).setParentNodeId(parentNodeId); } + /** * Write an instance of {@link AggregationMetadata} to the Fluo table. * @@ -432,10 +500,11 @@ public class FluoQueryMetadataDAO { requireNonNull(query); // Write the rest of the metadata objects. - switch(query.getQueryType()) { + switch (query.getQueryType()) { case Construct: ConstructQueryMetadata constructMetadata = query.getConstructQueryMetadata().get(); - // Store the Query ID so that it may be looked up from the original SPARQL string. + // Store the Query ID so that it may be looked up from the original + // SPARQL string. final String constructSparql = constructMetadata.getSparql(); final String constructQueryId = constructMetadata.getNodeId(); tx.set(Bytes.of(constructSparql), FluoQueryColumns.QUERY_ID, Bytes.of(constructQueryId)); @@ -443,13 +512,19 @@ public class FluoQueryMetadataDAO { break; case Projection: QueryMetadata queryMetadata = query.getQueryMetadata().get(); - // Store the Query ID so that it may be looked up from the original SPARQL string. + // Store the Query ID so that it may be looked up from the original + // SPARQL string. final String sparql = queryMetadata.getSparql(); final String queryId = queryMetadata.getNodeId(); tx.set(Bytes.of(sparql), FluoQueryColumns.QUERY_ID, Bytes.of(queryId)); write(tx, queryMetadata); break; } + + Optional<PeriodicQueryMetadata> periodicMetadata = query.getPeriodicQueryMetadata(); + if(periodicMetadata.isPresent()) { + write(tx, periodicMetadata.get()); + } for(final FilterMetadata filter : query.getFilterMetadata()) { write(tx, filter); @@ -510,6 +585,15 @@ public class FluoQueryMetadataDAO { addChildMetadata(sx, builder, constructBuilder.build().getChildNodeId()); break; + case PERIODIC_QUERY: + // Add this node's metadata. + final PeriodicQueryMetadata.Builder periodicQueryBuilder = readPeriodicQueryMetadataBuilder(sx, childNodeId); + builder.addPeriodicQueryMetadata(periodicQueryBuilder); + + // Add it's child's metadata. + addChildMetadata(sx, builder, periodicQueryBuilder.build().getChildNodeId()); + break; + case AGGREGATION: // Add this node's metadata. final AggregationMetadata.Builder aggregationBuilder = readAggregationMetadataBuilder(sx, childNodeId); @@ -546,6 +630,7 @@ public class FluoQueryMetadataDAO { break; default: break; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java new file mode 100644 index 0000000..33253f2 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * Metadata that is required for periodic queries in the Rya Fluo Application. + * If a periodic query is registered with the Rya Fluo application, the BindingSets + * are placed into temporal bins according to whether they occur within the window of + * a period's ending time. This Metadata is used to create a Bin Id, which is equivalent + * to the period's ending time, to be inserted into each BindingSet that occurs within that + * bin. This is to allow the AggregationUpdater to aggregate the bins by grouping on the + * Bin Id. + * + */ +public class PeriodicQueryMetadata extends CommonNodeMetadata { + + private String parentNodeId; + private String childNodeId; + private long windowSize; + private long period; + private TimeUnit unit; + private String temporalVariable; + + /** + * Constructs an instance of PeriodicQueryMetadata + * @param nodeId - id of periodic query node + * @param varOrder - variable order indicating the order the BindingSet results are written in + * @param parentNodeId - id of parent node + * @param childNodeId - id of child node + * @param windowSize - size of window used for filtering + * @param period - period size that indicates frequency of notifications + * @param unit - TimeUnit corresponding to window and period + * @param temporalVariable - temporal variable that periodic conditions are applied to + */ + public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, String parentNodeId, String childNodeId, long windowSize, long period, + TimeUnit unit, String temporalVariable) { + super(nodeId, varOrder); + this.parentNodeId = Preconditions.checkNotNull(parentNodeId); + this.childNodeId = Preconditions.checkNotNull(childNodeId); + this.temporalVariable = Preconditions.checkNotNull(temporalVariable); + this.unit = Preconditions.checkNotNull(unit); + Preconditions.checkArgument(period > 0); + Preconditions.checkArgument(windowSize >= period); + + this.windowSize = windowSize; + this.period = period; + } + + /** + * @return id of parent for navigating query + */ + public String getParentNodeId() { + return parentNodeId; + } + + /** + * + * @return id of child for navigating query + */ + public String getChildNodeId() { + return childNodeId; + } + + /** + * + * @return temporal variable used for filtering events + */ + public String getTemporalVariable() { + return temporalVariable; + } + + /** + * @return window duration in millis + */ + public long getWindowSize() { + return windowSize; + } + + /** + * @return period duration in millis + */ + public long getPeriod() { + return period; + } + + /** + * @return {@link TimeUnit} for window duration and period duration + */ + public TimeUnit getUnit() { + return unit; + } + + + /** + * @return {@link Builder} for chaining method calls to construct an instance of PeriodicQueryMetadata. + */ + public static Builder builder() { + return new Builder(); + } + + @Override + public int hashCode() { + return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, parentNodeId, temporalVariable, period, windowSize, unit); + } + + @Override + public boolean equals(final Object o) { + if (o == this) { + return true; + } + + if (o instanceof PeriodicQueryMetadata) { + if (super.equals(o)) { + PeriodicQueryMetadata metadata = (PeriodicQueryMetadata) o; + return new EqualsBuilder().append(childNodeId, metadata.childNodeId).append(parentNodeId, metadata.parentNodeId) + .append(windowSize, metadata.windowSize).append(period, metadata.period) + .append(unit, metadata.unit).append(temporalVariable, metadata.temporalVariable).isEquals(); + } + return false; + } + + return false; + } + + @Override + public String toString() { + return new StringBuilder() + .append("PeriodicQueryMetadata {\n") + .append(" Node ID: " + super.getNodeId() + "\n") + .append(" Variable Order: " + super.getVariableOrder() + "\n") + .append(" Parent Node ID: " + parentNodeId + "\n") + .append(" Child Node ID: " + childNodeId + "\n") + .append(" Period: " + period + "\n") + .append(" Window Size: " + windowSize + "\n") + .append(" Time Unit: " + unit + "\n") + .append(" Temporal Variable: " + temporalVariable + "\n") + .append("}") + .toString(); + } + + + /** + * Builder for chaining method calls to construct an instance of PeriodicQueryMetadata. + */ + public static class Builder { + + private String nodeId; + private VariableOrder varOrder; + private String parentNodeId; + private String childNodeId; + private long windowSize; + private long period; + private TimeUnit unit; + public String temporalVariable; + + public Builder setNodeId(String nodeId) { + this.nodeId = nodeId; + return this; + } + + /** + * + * @return id of of this node + */ + public String getNodeId() { + return nodeId; + } + + /** + * Set the {@link VariableOrder} + * @param varOrder to indicate order that results will be written in + * @return Builder for chaining methods calls + */ + public Builder setVarOrder(VariableOrder varOrder) { + this.varOrder = varOrder; + return this; + } + + /** + * Returns {@link VariableOrder} + * @return VariableOrder that indicates order that results are written in + */ + public VariableOrder getVarOrder() { + return varOrder; + } + + /** + * Sets id of parent node + * @param parentNodeId + * @return Builder for chaining methods calls + */ + public Builder setParentNodeId(String parentNodeId) { + this.parentNodeId = parentNodeId; + return this; + } + + /** + * @return id of parent node + */ + public String getParentNodeId() { + return parentNodeId; + } + + /** + * Set id of child node + * @param childNodeId + * @return Builder for chaining methods calls + */ + public Builder setChildNodeId(String childNodeId) { + this.childNodeId = childNodeId; + return this; + } + + /** + * Sets window size for periodic query + * @param windowSize + * @return Builder for chaining methods calls + */ + public Builder setWindowSize(long windowSize) { + this.windowSize = windowSize; + return this; + } + + /** + * Sets period for periodic query + * @param period + * @return Builder for chaining methods calls + */ + public Builder setPeriod(long period) { + this.period = period; + return this; + } + + /** + * Sets time unit of window and period for periodic query + * @param unit + * @return Builder for chaining methods calls + */ + public Builder setUnit(TimeUnit unit) { + this.unit = unit; + return this; + } + + /** + * Indicate which variable in BindingSet results is the temporal variable that periodic + * Conditions should be applied to + * @param temporalVariable + * @return Builder for chaining methods calls + */ + public Builder setTemporalVariable(String temporalVariable) { + this.temporalVariable = temporalVariable; + return this; + } + + /** + * @return PeriodicQueryMetadata constructed from parameters passed to this Builder + */ + public PeriodicQueryMetadata build() { + return new PeriodicQueryMetadata(nodeId, varOrder, parentNodeId, childNodeId, windowSize, period, unit, temporalVariable); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java new file mode 100644 index 0000000..f1ade59 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.openrdf.query.algebra.QueryModelVisitor; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.UnaryTupleOperator; +import org.openrdf.query.algebra.evaluation.function.Function; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; + +/** + * This is a {@link UnaryTupleOperator} that gets placed in the parsed query + * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL String that + * contains the Periodic {@link Function} {@link PeriodicQueryUtil#PeriodicQueryURI}. + * The PeiodicQueryNode is created from the arguments passed to the Periodic Function, + * which consist of a time unit, a temporal period, a temporal window of time, and the + * temporal variable in the query, which assumes a value indicated by the + * Time ontology: http://www.w3.org/2006/time. The purpose of the PeriodicQueryNode + * is to filter out all events that did not occur within the specified window of time + * of this instant and to generate notifications at a regular interval indicated by the period. + * + */ +public class PeriodicQueryNode extends UnaryTupleOperator { + + private TimeUnit unit; + private long windowDuration; + private long periodDuration; + private String temporalVar; + + /** + * Creates a PeriodicQueryNode from the specified values. + * @param window - specifies the window of time that event must occur within from this instant + * @param period - regular interval at which notifications are generated (must be leq window). + * @param unit - time unit of the period and window + * @param temporalVar - temporal variable in query used for filtering + * @param arg - child of PeriodicQueryNode in parsed query + */ + public PeriodicQueryNode(long window, long period, TimeUnit unit, String temporalVar, TupleExpr arg) { + super(checkNotNull(arg)); + checkArgument(0 < period && period <= window); + this.temporalVar = checkNotNull(temporalVar); + this.unit = checkNotNull(unit); + this.windowDuration = window; + this.periodDuration = period; + } + + /** + * @return - temporal variable used to filter events + */ + public String getTemporalVariable() { + return temporalVar; + } + + /** + * @return window duration in millis + */ + public long getWindowSize() { + return windowDuration; + } + + /** + * @return period duration in millis + */ + public long getPeriod() { + return periodDuration; + } + + /** + * @return {@link TimeUnit} for window duration and period duration + */ + public TimeUnit getUnit() { + return unit; + } + + @Override + public <X extends Exception> void visit(QueryModelVisitor<X> visitor) throws X { + visitor.meetOther(this); + } + + @Override + public boolean equals(Object other) { + if(this == other) { + return true; + } + + if (other instanceof PeriodicQueryNode) { + if (super.equals(other)) { + PeriodicQueryNode metadata = (PeriodicQueryNode) other; + return new EqualsBuilder().append(windowDuration, metadata.windowDuration).append(periodDuration, metadata.periodDuration) + .append(unit, metadata.unit).append(temporalVar, metadata.temporalVar).isEquals(); + } + return false; + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(arg, unit, windowDuration, periodDuration, temporalVar); + } + + /** + * @return String representation of this node that is printed in when query tree is printed. + */ + @Override + public String getSignature() { + StringBuilder sb = new StringBuilder(); + + sb.append("PeriodicQueryNode("); + sb.append("Var = " + temporalVar + ", "); + sb.append("Window = " + windowDuration + " ms, "); + sb.append("Period = " + periodDuration + " ms, "); + sb.append("Time Unit = " + unit + ")"); + + + return sb.toString(); + } + + @Override + public PeriodicQueryNode clone() { + PeriodicQueryNode clone = (PeriodicQueryNode)super.clone(); + clone.setArg(getArg().clone()); + clone.periodDuration = periodDuration; + clone.windowDuration = windowDuration; + clone.unit = unit; + clone.temporalVar = temporalVar; + return clone; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java index 23ac286..d017724 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java @@ -142,6 +142,10 @@ public class QueryMetadata extends CommonNodeMetadata { public Builder(final String nodeId) { this.nodeId = checkNotNull(nodeId); } + + public String getNodeId() { + return nodeId; + } /** @@ -154,6 +158,13 @@ public class QueryMetadata extends CommonNodeMetadata { this.varOrder = varOrder; return this; } + + /** + * @return the variable order of binding sets that are emitted by this node + */ + public VariableOrder getVariableOrder() { + return varOrder; + } /** * Set the SPARQL query whose results are being updated by the Fluo app. @@ -176,6 +187,10 @@ public class QueryMetadata extends CommonNodeMetadata { this.childNodeId = childNodeId; return this; } + + public String getChildNodeId() { + return childNodeId; + } /** * @return An instance of {@link QueryMetadata} build using this builder's values. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java index 631ce60..8e348f2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/SparqlFluoQueryBuilder.java @@ -18,12 +18,13 @@ */ package org.apache.rya.indexing.pcj.fluo.app.query; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.QUERY_PREFIX; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; @@ -40,12 +41,14 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection; -import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.NodeType; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; +import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.openrdf.model.Value; import org.openrdf.model.impl.BNodeImpl; @@ -105,7 +108,9 @@ public class SparqlFluoQueryBuilder { final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder(); final NewQueryVisitor visitor = new NewQueryVisitor(sparql, fluoQueryBuilder, nodeIds); - parsedQuery.getTupleExpr().visit( visitor ); + TupleExpr te = parsedQuery.getTupleExpr(); + PeriodicQueryUtil.placePeriodicQueryNode(te); + te.visit( visitor ); final FluoQuery fluoQuery = fluoQueryBuilder.build(); return fluoQuery; @@ -187,16 +192,17 @@ public class SparqlFluoQueryBuilder { prefix = AGGREGATION_PREFIX; } else if (node instanceof Reduced) { prefix = CONSTRUCT_PREFIX; + } else if(node instanceof PeriodicQueryNode) { + prefix = PERIODIC_QUERY_PREFIX; } else { throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass()); } - // Create the unique portion of the id. final String unique = UUID.randomUUID().toString().replaceAll("-", ""); - // Put them together to create the Node ID. return prefix + "_" + unique; } + } /** @@ -204,19 +210,13 @@ public class SparqlFluoQueryBuilder { * the node to a {@link FluoQuery.Builder}. This information is used by the * application's observers to incrementally update a PCJ. */ - private static class NewQueryVisitor extends QueryModelVisitorBase<RuntimeException> { + public static class NewQueryVisitor extends QueryModelVisitorBase<RuntimeException> { private final NodeIds nodeIds; private final FluoQuery.Builder fluoQueryBuilder; private final String sparql; /** - * Stored with each Filter node so that we can figure out how to evaluate it within - * {@link FilterResultUpdater}. Incremented each time a filter has been stored. - */ - private int filterIndexWithinQuery = 0; - - /** * Constructs an instance of {@link NewQueryVisitor}. * * @param sparql - The SPARQL query whose structure will be represented @@ -378,6 +378,7 @@ public class SparqlFluoQueryBuilder { @Override public void meet(final Filter node) { + // Get or create a builder for this node populated with the known metadata. final String filterId = nodeIds.getOrMakeId(node); @@ -387,8 +388,13 @@ public class SparqlFluoQueryBuilder { fluoQueryBuilder.addFilterMetadata(filterBuilder); } - filterBuilder.setOriginalSparql(sparql); - filterBuilder.setFilterIndexWithinSparql(filterIndexWithinQuery++); + String filterString; + try { + filterString = FilterSerializer.serialize(node); + } catch (FilterParseException e) { + throw new RuntimeException(e); + } + filterBuilder.setFilterSparql(filterString); final QueryModelNode child = node.getArg(); if(child == null) { @@ -406,6 +412,47 @@ public class SparqlFluoQueryBuilder { // Walk to the next node. super.meet(node); } + + public void meetOther(final QueryModelNode qNode) { + if (qNode instanceof PeriodicQueryNode) { + PeriodicQueryNode node = (PeriodicQueryNode) qNode; + // Get or create a builder for this node populated with the + // known metadata. + final String periodicId = nodeIds.getOrMakeId(node); + + PeriodicQueryMetadata.Builder periodicBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull(); + if (periodicBuilder == null) { + periodicBuilder = PeriodicQueryMetadata.builder(); + periodicBuilder.setNodeId(periodicId); + fluoQueryBuilder.addPeriodicQueryMetadata(periodicBuilder); + } + periodicBuilder.setWindowSize(node.getWindowSize()); + periodicBuilder.setPeriod(node.getPeriod()); + periodicBuilder.setTemporalVariable(node.getTemporalVariable()); + periodicBuilder.setUnit(node.getUnit()); + + final QueryModelNode child = node.getArg(); + if (child == null) { + throw new IllegalArgumentException("PeriodicQueryNode child arg connot be null."); + } + + final String childNodeId = nodeIds.getOrMakeId(child); + periodicBuilder.setChildNodeId(childNodeId); + + // Update the child node's metadata. + final Set<String> childVars = getVars((TupleExpr) child); + final VariableOrder childVarOrder = new VariableOrder(childVars); + setChildMetadata(childNodeId, childVarOrder, periodicId); + + // update variable order of this node and all ancestors to + // include BIN_ID binding as + // first variable in the ordering + PeriodicQueryUtil.updateVarOrdersToIncludeBin(fluoQueryBuilder, periodicId); + // Walk to the next node. + node.getArg().visit(this); + } + } + @Override public void meet(final Projection node) { @@ -553,10 +600,24 @@ public class SparqlFluoQueryBuilder { case QUERY: throw new IllegalArgumentException("A QUERY node cannot be the child of another node."); + case CONSTRUCT: throw new IllegalArgumentException("A CONSTRUCT node cannot be the child of another node."); + + case PERIODIC_QUERY: + PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull(); + if (periodicQueryBuilder == null) { + periodicQueryBuilder = PeriodicQueryMetadata.builder(); + periodicQueryBuilder.setNodeId(childNodeId); + fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder); + } + periodicQueryBuilder.setVarOrder(childVarOrder); + periodicQueryBuilder.setParentNodeId(parentNodeId); + break; + default: throw new IllegalArgumentException("Unsupported NodeType: " + childType); + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java new file mode 100644 index 0000000..73f3447 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FilterSerializer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import java.util.HashSet; +import java.util.Set; + +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.SingletonSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.ParsedTupleQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; + +/** + * Class for creating a String representation a given Filter, and for + * converting the String representation of the Filter back to the Filter. + * + */ +public class FilterSerializer { + + private static final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer(); + private static final SPARQLParser parser = new SPARQLParser(); + + /** + * Converts a {@link Filter} to a SPARQL query containing only the SPARQL representation + * of the Filter along with a Select clause that return all variables. The argument of the + * Filter is replaced by a {@link SingletonSet} so that the body of the SPARQL query consists of only a + * single Filter clause. + * @param filter - Filter to be serialized + * @return - SPARQL String containing a single Filter clause that represents the serialized Filter + * @throws FilterParseException + */ + public static String serialize(Filter filter) throws FilterParseException { + Filter clone = filter.clone(); + clone.setArg(new SingletonSet()); + try { + return renderer.render(new ParsedTupleQuery(clone)); + } catch (Exception e) { + throw new FilterParseException("Unable to parse Filter.", e); + } + } + + /** + * Converts a SPARQL query consisting of a single Filter clause back to a Filter. + * @param sparql - SPARQL query representing a Filter + * @return - parsed Filter included in the SPARQL query + * @throws FilterParseException + */ + public static Filter deserialize(String sparql) throws FilterParseException { + + try { + ParsedQuery pq = parser.parseQuery(sparql, null); + FilterVisitor visitor = new FilterVisitor(); + pq.getTupleExpr().visit(visitor); + Set<Filter> filters = visitor.getFilters(); + + if(filters.size() != 1) { + throw new FilterParseException("Filter String must contain only one Filter."); + } + + return filters.iterator().next(); + + } catch (Exception e) { + throw new FilterParseException("Unable to parse Filter.", e); + } + } + + public static class FilterVisitor extends QueryModelVisitorBase<RuntimeException> { + + private Set<Filter> filters; + + public FilterVisitor() { + filters = new HashSet<>(); + } + + public Set<Filter> getFilters() { + return filters; + } + + public void meet(Filter node) { + filters.add(node); + } + } + + public static class FilterParseException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link FilterParseException}. + * + * @param message - Explains why this exception is being thrown. + */ + public FilterParseException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link FilterParseException}. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public FilterParseException(final String message, final Throwable t) { + super(message, t); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java new file mode 100644 index 0000000..9446c87 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/FluoClientFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import java.util.Optional; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.config.FluoConfiguration; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; + +/** + * Factory for creating {@link FluoClient}s. + * + */ +public class FluoClientFactory { + + /** + * Creates a FluoClient + * @param appName - name of Fluo application + * @param tableName - name of Fluo table + * @param conf - AccumuloConfiguration (must contain Accumulo User, Accumulo Instance, Accumulo Password, and Accumulo Zookeepers) + * @return FluoClient for connecting to Fluo + */ + public static FluoClient getFluoClient(String appName, Optional<String> tableName, AccumuloRdfConfiguration conf) { + FluoConfiguration fluoConfig = new FluoConfiguration(); + fluoConfig.setAccumuloInstance(conf.getAccumuloInstance()); + fluoConfig.setAccumuloUser(conf.getAccumuloUser()); + fluoConfig.setAccumuloPassword(conf.getAccumuloPassword()); + fluoConfig.setInstanceZookeepers(conf.getAccumuloZookeepers() + "/fluo"); + fluoConfig.setAccumuloZookeepers(conf.getAccumuloZookeepers()); + fluoConfig.setApplicationName(appName); + if (tableName.isPresent()) { + fluoConfig.setAccumuloTable(tableName.get()); + } else { + fluoConfig.setAccumuloTable(appName); + } + return new FluoClientImpl(fluoConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java new file mode 100644 index 0000000..fd24af2 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/PeriodicQueryUtil.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.Group; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.Reduced; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.UnaryTupleOperator; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * Utility class for creating and executing Perioid Queries. + * + */ +public class PeriodicQueryUtil { + + private static final ValueFactory vf = new ValueFactoryImpl(); + public static final String PeriodicQueryURI = "http://org.apache.rya/function#periodic"; + public static final String temporalNameSpace = "http://www.w3.org/2006/time#"; + public static final URI DAYS = vf.createURI("http://www.w3.org/2006/time#days"); + public static final URI HOURS = vf.createURI("http://www.w3.org/2006/time#hours"); + public static final URI MINUTES = vf.createURI("http://www.w3.org/2006/time#minutes"); + + /** + * Returns a PeriodicQueryNode for all {@link FunctionCall}s that represent PeriodicQueryNodes, otherwise + * an empty Optional is returned. + * @param functionCall - FunctionCall taken from a {@lin TupleExpr} + * @param arg - TupleExpr that will be the argument of the PeriodicQueryNode if it is created + * @return - Optional containing a PeriodicQueryNode if FunctionCall represents PeriodicQueryNode and empty Optional otherwise + * @throws Exception + */ + public static Optional<PeriodicQueryNode> getPeriodicQueryNode(FunctionCall functionCall, TupleExpr arg) throws Exception { + + if (functionCall.getURI().equals(PeriodicQueryURI)) { + return Optional.of(parseAndSetValues(functionCall.getArgs(), arg)); + } + + return Optional.empty(); + } + + /** + * Finds and places a PeriodicQueryNode if the TupleExpr contains a FunctionCall + * that represents a PeriodicQueryNode. + * @param query - TupleExpr with PeriodicQueryNode placed and positioned at the top of the query + */ + public static void placePeriodicQueryNode(TupleExpr query) { + query.visit(new PeriodicQueryNodeVisitor()); + query.visit(new PeriodicQueryNodeRelocator()); + } + + public static Optional<PeriodicQueryNode> getPeriodicNode(String sparql) throws MalformedQueryException { + TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr(); + PeriodicQueryNodeVisitor periodicVisitor = new PeriodicQueryNodeVisitor(); + te.visit(periodicVisitor); + return periodicVisitor.getPeriodicNode(); + } + + /** + * Locates Filter containing FunctionCall with PeriodicQuery info and + * replaces that Filter with a PeriodicQueryNode. + */ + public static class PeriodicQueryNodeVisitor extends QueryModelVisitorBase<RuntimeException> { + + private int count = 0; + private PeriodicQueryNode periodicNode; + + public Optional<PeriodicQueryNode> getPeriodicNode() { + return Optional.ofNullable(periodicNode); + } + + public void meet(Filter node) { + if (node.getCondition() instanceof FunctionCall) { + try { + Optional<PeriodicQueryNode> optNode = getPeriodicQueryNode((FunctionCall) node.getCondition(), node.getArg()); + if (optNode.isPresent()) { + if (count > 0) { + throw new IllegalArgumentException("Query cannot contain more than one PeriodicQueryNode"); + } + periodicNode = optNode.get(); + node.replaceWith(periodicNode); + count++; + periodicNode.visit(this); + } else { + super.meet(node); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + } else { + super.meet(node); + } + } + } + + /** + * Relocates PeriodicQueryNode so that it occurs below either the Construct + * Query Node, the Projection Query Node if no Aggregation exists, or the + * Group Node if an Aggregation exists. This limits the number of nodes + * whose variable order needs to be changed when the PeriodicQueryMetadata + * is added. + */ + public static class PeriodicQueryNodeRelocator extends QueryModelVisitorBase<RuntimeException> { + + private UnaryTupleOperator relocationParent; + + public void meet(Projection node) { + relocationParent = node; + node.getArg().visit(this); + } + + public void meet(Group node) { + relocationParent = node; + super.meet(node); + } + + public void meet(Reduced node) { + relocationParent = node; + super.meet(node); + } + + public void meet(Filter node) { + super.meet(node); + } + + @Override + public void meetOther(QueryModelNode node) { + + if (node instanceof PeriodicQueryNode) { + PeriodicQueryNode pNode = (PeriodicQueryNode) node; + // do nothing if PeriodicQueryNode already positioned correctly + if (pNode.equals(relocationParent.getArg())) { + return; + } + // remove node from query + pNode.replaceWith(pNode.getArg()); + // set node' child to be relocationParent's child + pNode.setArg(relocationParent.getArg()); + // add node back into query below relocationParent + relocationParent.replaceChildNode(relocationParent.getArg(), pNode); + } + } + } + + /** + * Adds the variable "periodicBinId" to the beginning of all {@link VariableOrder}s for the + * Metadata nodes that appear above the PeriodicQueryNode. This ensures that the binId is + * written first in the Row so that bins can be easily scanned and deleted. + * @param builder + * @param nodeId + */ + public static void updateVarOrdersToIncludeBin(FluoQuery.Builder builder, String nodeId) { + NodeType type = NodeType.fromNodeId(nodeId).orNull(); + if (type == null) { + throw new IllegalArgumentException("NodeId must be associated with an existing MetadataBuilder."); + } + switch (type) { + case AGGREGATION: + AggregationMetadata.Builder aggBuilder = builder.getAggregateBuilder(nodeId).orNull(); + if (aggBuilder != null) { + VariableOrder varOrder = aggBuilder.getVariableOrder(); + VariableOrder groupOrder = aggBuilder.getGroupByVariableOrder(); + // update varOrder with BIN_ID + List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); + orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); + aggBuilder.setVariableOrder(new VariableOrder(orderList)); + // update groupVarOrder with BIN_ID + List<String> groupOrderList = new ArrayList<>(groupOrder.getVariableOrders()); + groupOrderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); + aggBuilder.setGroupByVariableOrder(new VariableOrder(groupOrderList)); + // recursive call to update the VariableOrders of all ancestors + // of this node + updateVarOrdersToIncludeBin(builder, aggBuilder.getParentNodeId()); + } else { + throw new IllegalArgumentException("There is no AggregationMetadata.Builder for the indicated Id."); + } + break; + case PERIODIC_QUERY: + PeriodicQueryMetadata.Builder periodicBuilder = builder.getPeriodicQueryBuilder().orNull(); + if (periodicBuilder != null && periodicBuilder.getNodeId().equals(nodeId)) { + VariableOrder varOrder = periodicBuilder.getVarOrder(); + List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); + orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); + periodicBuilder.setVarOrder(new VariableOrder(orderList)); + // recursive call to update the VariableOrders of all ancestors + // of this node + updateVarOrdersToIncludeBin(builder, periodicBuilder.getParentNodeId()); + } else { + throw new IllegalArgumentException( + "PeriodicQueryMetadata.Builder id does not match the indicated id. A query cannot have more than one PeriodicQueryMetadata Node."); + } + break; + case QUERY: + QueryMetadata.Builder queryBuilder = builder.getQueryBuilder().orNull(); + if (queryBuilder != null && queryBuilder.getNodeId().equals(nodeId)) { + VariableOrder varOrder = queryBuilder.getVariableOrder(); + List<String> orderList = new ArrayList<>(varOrder.getVariableOrders()); + orderList.add(0, IncrementalUpdateConstants.PERIODIC_BIN_ID); + queryBuilder.setVariableOrder(new VariableOrder(orderList)); + } else { + throw new IllegalArgumentException( + "QueryMetadata.Builder id does not match the indicated id. A query cannot have more than one QueryMetadata Node."); + } + break; + default: + throw new IllegalArgumentException( + "Incorrectly positioned PeriodicQueryNode. The PeriodicQueryNode can only be positioned below Projections, Extensions, and ConstructQueryNodes."); + } + } + + /** + * Collects all Metadata node Ids that are ancestors of the PeriodicQueryNode and contain the variable + * {@link IncrementalUpdateConstants#PERIODIC_BIN_ID}. + * @param sx - Fluo Snapshot for scanning Fluo + * @param nodeId - root node of the PeriodicQuery + * @param ids - query ids of all metadata nodes appearing between root and PeriodicQueryMetadata node + */ + public static void getPeriodicQueryNodeAncestorIds(SnapshotBase sx, String nodeId, Set<String> ids) { + NodeType nodeType = NodeType.fromNodeId(nodeId).orNull(); + checkArgument(nodeType != null, "Invalid nodeId: " + nodeId + ". NodeId does not correspond to a valid NodeType."); + switch (nodeType) { + case FILTER: + ids.add(nodeId); + getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.FILTER_CHILD_NODE_ID).toString(), ids); + break; + case PERIODIC_QUERY: + ids.add(nodeId); + break; + case QUERY: + ids.add(nodeId); + getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.QUERY_CHILD_NODE_ID).toString(), ids); + break; + case AGGREGATION: + ids.add(nodeId); + getPeriodicQueryNodeAncestorIds(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_CHILD_NODE_ID).toString(), ids); + break; + default: + throw new RuntimeException("Invalid NodeType."); + } + } + + + + /** + * + * @param values - Values extracted from FunctionCall representing the PeriodicQuery Filter + * @param arg - Argument of the PeriodicQueryNode that will be created (PeriodicQueryNode is a UnaryTupleOperator) + * @return - PeriodicQueryNode to be inserted in place of the original FunctionCall + * @throws Exception + */ + private static PeriodicQueryNode parseAndSetValues(List<ValueExpr> values, TupleExpr arg) throws Exception { + // general validation of input + Preconditions.checkArgument(values.size() == 4); + Preconditions.checkArgument(values.get(0) instanceof Var); + Preconditions.checkArgument(values.get(1) instanceof ValueConstant); + Preconditions.checkArgument(values.get(2) instanceof ValueConstant); + Preconditions.checkArgument(values.get(3) instanceof ValueConstant); + + // get temporal variable + Var var = (Var) values.get(0); + Preconditions.checkArgument(var.getValue() == null); + String tempVar = var.getName(); + + // get TimeUnit + TimeUnit unit = getTimeUnit((ValueConstant) values.get(3)); + + // get window and period durations + double windowDuration = parseTemporalDuration((ValueConstant) values.get(1)); + double periodDuration = parseTemporalDuration((ValueConstant) values.get(2)); + long windowMillis = convertToMillis(windowDuration, unit); + long periodMillis = convertToMillis(periodDuration, unit); + // period must evenly divide window at least once + Preconditions.checkArgument(windowMillis > periodMillis); + Preconditions.checkArgument(windowMillis % periodMillis == 0, "Period duration does not evenly divide window duration."); + + // create PeriodicMetadata.Builder + return new PeriodicQueryNode(windowMillis, periodMillis, TimeUnit.MILLISECONDS, tempVar, arg); + } + + private static TimeUnit getTimeUnit(ValueConstant val) { + Preconditions.checkArgument(val.getValue() instanceof URI); + URI uri = (URI) val.getValue(); + Preconditions.checkArgument(uri.getNamespace().equals(temporalNameSpace)); + + switch (uri.getLocalName()) { + case "days": + return TimeUnit.DAYS; + case "hours": + return TimeUnit.HOURS; + case "minutes": + return TimeUnit.MINUTES; + default: + throw new IllegalArgumentException("Invalid time unit for Periodic Function."); + } + } + + private static double parseTemporalDuration(ValueConstant valConst) { + Value val = valConst.getValue(); + Preconditions.checkArgument(val instanceof Literal); + Literal literal = (Literal) val; + String stringVal = literal.getLabel(); + URI dataType = literal.getDatatype(); + Preconditions.checkArgument(dataType.equals(XMLSchema.DECIMAL) || dataType.equals(XMLSchema.DOUBLE) + || dataType.equals(XMLSchema.FLOAT) || dataType.equals(XMLSchema.INTEGER) || dataType.equals(XMLSchema.INT)); + return Double.parseDouble(stringVal); + } + + private static long convertToMillis(double duration, TimeUnit unit) { + Preconditions.checkArgument(duration > 0); + + double convertedDuration = 0; + switch (unit) { + case DAYS: + convertedDuration = duration * 24 * 60 * 60 * 1000; + break; + case HOURS: + convertedDuration = duration * 60 * 60 * 1000; + break; + case MINUTES: + convertedDuration = duration * 60 * 1000; + break; + default: + throw new IllegalArgumentException("TimeUnit must be of type DAYS, HOURS, or MINUTES."); + } + // check that double representation has exact millis representation + Preconditions.checkArgument(convertedDuration == (long) convertedDuration); + return (long) convertedDuration; + } + +}