[GitHub] incubator-rya pull request #240: Rya 374

2017-11-21 Thread meiercaleb
Github user meiercaleb closed the pull request at:

https://github.com/apache/incubator-rya/pull/240


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-20 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145972309
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/AggregationStateManager.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.query.CommonNodeMetadataImpl;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Class that performs aggregation state lookups while maintaining a cache
+ * of old lookups.  This class uses a single transaction to perform a 
number
+ * of aggregation state lookups.
+ */
+public class AggregationStateManager {
+
+private String nodeId;
+private VariableOrder varOrder;
+private TransactionBase tx;
+private Map aggregationStateMap = new HashMap<>();
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145812197
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/StopNodeVisitor.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryBuilderVisitorBase;
+
+/**
+ *  Base class that enables all visitors extending this class to stop 
traversing
+ *  the Builder tree at a prescribed node.  Any visitor extending this 
class will
+ *  traverse the visitor tree and process all nodes up to and including 
the prescribed stop
+ *  node.  
+ */
+public abstract class StopNodeVisitor extends QueryBuilderVisitorBase {
+
+private String stopNodeId;
+private boolean processedStopNode = false;
+
+public StopNodeVisitor(FluoQuery.Builder fluoBuilder,String nodeId) {
+super(fluoBuilder);
+this.stopNodeId = checkNotNull(nodeId);
+}
+
+@Override
+public void visitNode(String nodeId) {
+//process the stop node, then stop traversing
+if(!processedStopNode) {
+processedStopNode = atStopNode(nodeId);
+super.visitNode(nodeId);
+}
+}
+
+boolean atStopNode(String nodeId) {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145811793
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/AggregationStateMetadataVisitor.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static jline.internal.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.CommonNodeMetadataImpl;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+
+/**
+ *  This builder visitor inserts the aggregation metadata into all of its
+ *  ancestors.
+ */
+public class AggregationStateMetadataVisitor extends StopNodeVisitor {
+
+private CommonNodeMetadataImpl aggStateMeta;
+
+public AggregationStateMetadataVisitor(FluoQuery.Builder 
fluoQueryBuilder, String stopNodeId, CommonNodeMetadataImpl aggStateMeta) {
+super(fluoQueryBuilder, stopNodeId);
+this.aggStateMeta = checkNotNull(aggStateMeta);
+}
+
+public void visit(QueryMetadata.Builder builder) {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145811379
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StateNodeMetadata.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Optional;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+/**
+ * Metadata class that is meant to be extended by any metadata node that 
could have Aggregation State.
+ *
+ */
+public class StateNodeMetadata extends CommonNodeMetadata {
+
+private Optional stateMetadata;
+
+/**
+ * Creates a StateNodeMetadata object with empty aggregation metadata.
+ * @param nodeId 
+ * @param varOrder
+ */
+public StateNodeMetadata(String nodeId, VariableOrder varOrder) {
+this(nodeId, varOrder, Optional.empty());
+}
+
+/**
+ * Creates a StateNodeMetadata object with indicated aggregation 
metadata.
+ * @param nodeId
+ * @param varOrder 
+ * @param aggregationStateMetadata - aggregation metadata to look up 
aggregation state
+ */
+public StateNodeMetadata(String nodeId, VariableOrder varOrder, 
Optional aggregationStateMetadata) {
+super(nodeId, varOrder);
+this.stateMetadata = aggregationStateMetadata;
+}
+
+/**
+ * @return - Optional containing aggregation metadata to look up 
aggregation state (if it exists)
+ */
+public Optional getStateMetadata() {
+return stateMetadata;
+}
+
+@Override
+public int hashCode() {
+return Objects.hashCode(super.getNodeId(), 
super.getVariableOrder(), stateMetadata);
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+
+if (o instanceof StateNodeMetadata) {
+if (super.equals(o)) {
+final StateNodeMetadata metadata = (StateNodeMetadata) o;
+return new EqualsBuilder().append(stateMetadata, 
metadata.stateMetadata).isEquals();
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145810960
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StateNodeMetadata.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Optional;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+/**
+ * Metadata class that is meant to be extended by any metadata node that 
could have Aggregation State.
+ *
+ */
+public class StateNodeMetadata extends CommonNodeMetadata {
+
+private Optional stateMetadata;
+
+/**
+ * Creates a StateNodeMetadata object with empty aggregation metadata.
+ * @param nodeId 
+ * @param varOrder
+ */
+public StateNodeMetadata(String nodeId, VariableOrder varOrder) {
+this(nodeId, varOrder, Optional.empty());
+}
+
+/**
+ * Creates a StateNodeMetadata object with indicated aggregation 
metadata.
+ * @param nodeId
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145810475
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
 ---
@@ -215,6 +220,20 @@ public Builder setVarOrder(@Nullable final 
VariableOrder varOrder) {
 public VariableOrder getVariableOrder() {
 return varOrder;
 }
+
+/**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145810280
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
 ---
@@ -271,6 +275,20 @@ public Builder setUnit(TimeUnit unit) {
 }
 
 /**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145810151
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
 ---
@@ -224,6 +231,20 @@ public VariableOrder getVariableOrder() {
 }
 
 /**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145809880
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
 ---
@@ -199,13 +206,27 @@ public Builder setParentNodeId(String parentNodeId) {
 this.parentNodeId = parentNodeId;
 return this;
 }
+
+/**
+ * Sets the state metadata of this {@link ConstructQueryMetadata}.
+ * @param stateMetadata
+ * @return This builder so that method invocations may be chained.
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl 
stateMetadata) {
+this.state = stateMetadata;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145810046
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
 ---
@@ -221,6 +228,20 @@ public Builder setChildNodeId(@Nullable final String 
childNodeId) {
 public String getChildNodeId() {
 return childNodeId;
 }
+
+/**
+ * Sets the Aggregation State for this Filter node.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145809414
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadataImpl.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Implementation of {@link CommonNodeMetadata} for storing metadata 
nodeIds and
+ * {@link VariableOrder}s.  This class is used primarily for storing 
Aggregation State 
+ * Metadata.
+ */
+public class CommonNodeMetadataImpl extends CommonNodeMetadata {
+
+public CommonNodeMetadataImpl(String nodeId, VariableOrder varOrder) {
+super(nodeId, varOrder);
+}
+
+public CommonNodeMetadataImpl(CommonNodeMetadataImpl metadata) {
+super(metadata.getNodeId(), metadata.getVariableOrder());
+}
+
+/**
+ * Reads/Writes instances of {@link CommonNodeMetadataImpl} to/from 
Json Strings.
+ */
+public static class CommonNodeMetadataSerDe {
+
+private static Logger log = 
Logger.getLogger(CommonNodeMetadataSerDe.class);
+private static Gson gson = new 
GsonBuilder().registerTypeAdapter(CommonNodeMetadataImpl.class, new 
CommonNodeTypeAdapter())
+.create();
+
+public static String serialize(CommonNodeMetadataImpl metadata) {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145809455
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadataImpl.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Implementation of {@link CommonNodeMetadata} for storing metadata 
nodeIds and
+ * {@link VariableOrder}s.  This class is used primarily for storing 
Aggregation State 
+ * Metadata.
+ */
+public class CommonNodeMetadataImpl extends CommonNodeMetadata {
+
+public CommonNodeMetadataImpl(String nodeId, VariableOrder varOrder) {
+super(nodeId, varOrder);
+}
+
+public CommonNodeMetadataImpl(CommonNodeMetadataImpl metadata) {
+super(metadata.getNodeId(), metadata.getVariableOrder());
+}
+
+/**
+ * Reads/Writes instances of {@link CommonNodeMetadataImpl} to/from 
Json Strings.
+ */
+public static class CommonNodeMetadataSerDe {
+
+private static Logger log = 
Logger.getLogger(CommonNodeMetadataSerDe.class);
+private static Gson gson = new 
GsonBuilder().registerTypeAdapter(CommonNodeMetadataImpl.class, new 
CommonNodeTypeAdapter())
+.create();
+
+public static String serialize(CommonNodeMetadataImpl metadata) {
+try {
+return gson.toJson(metadata);
+} catch (Exception e) {
+log.info("Unable to serialize BatchInformation: " + 
metadata);
+throw new RuntimeException(e);
+}
+}
+
+public static Optional deserialize(String 
json) {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145808226
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
 ---
@@ -65,6 +67,14 @@ public String getNodeId() {
 public VariableOrder getVariableOrder() {
 return varOrder;
 }
+
+/**
+ * Sets the VariableOrder for this node. Allows the VariableOrder to 
be updated.
+ * @param varOrder - VariableOrder for this metadata node.
+ */
+public void setVariableOrder(VariableOrder varOrder) {
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145807979
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
 ---
@@ -60,6 +61,10 @@ public JsonElement serialize(JoinBatchInformation batch, 
Type typeOfSrc, JsonSer
 result.add("span", new JsonPrimitive(span.getStart().getsRow() + 
"\u" + span.getEnd().getsRow()));
 result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
 result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+if(batch.getAggregationStateMeta().isPresent()) {
+CommonNodeMetadataImpl stateMeta = 
batch.getAggregationStateMeta().get();
+result.add("aggStateMeta", new 
JsonPrimitive(stateMeta.getNodeId() + "\u" + 
stateMeta.getVariableOrder().toString()));
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145807643
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -1,4 +1,6 @@
 package org.apache.rya.indexing.pcj.fluo.app.batch;
+import static jline.internal.Preconditions.checkNotNull;
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-19 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r145807342
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
 ---
@@ -74,53 +76,60 @@ public void processBatch(TransactionBase tx, Bytes row, 
BatchInformation batch)
 Preconditions.checkArgument(batch instanceof JoinBatchInformation);
 JoinBatchInformation joinBatch = (JoinBatchInformation) batch;
 Task task = joinBatch.getTask();
+VisibilityBindingSet bs = joinBatch.getBs();
 
-// Figure out which join algorithm we are going to use.
-final IterativeJoin joinAlgorithm;
-switch (joinBatch.getJoinType()) {
-case NATURAL_JOIN:
-joinAlgorithm = new NaturalJoin();
-break;
-case LEFT_OUTER_JOIN:
-joinAlgorithm = new LeftOuterJoin();
-break;
-default:
-throw new RuntimeException("Unsupported JoinType: " + 
joinBatch.getJoinType());
+// create aggregation state manager is aggregation state metadata 
exists
--- End diff --

Done


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143866935
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchBindingSetUpdater.java
 ---
@@ -74,53 +76,60 @@ public void processBatch(TransactionBase tx, Bytes row, 
BatchInformation batch)
 Preconditions.checkArgument(batch instanceof JoinBatchInformation);
 JoinBatchInformation joinBatch = (JoinBatchInformation) batch;
 Task task = joinBatch.getTask();
+VisibilityBindingSet bs = joinBatch.getBs();
 
-// Figure out which join algorithm we are going to use.
-final IterativeJoin joinAlgorithm;
-switch (joinBatch.getJoinType()) {
-case NATURAL_JOIN:
-joinAlgorithm = new NaturalJoin();
-break;
-case LEFT_OUTER_JOIN:
-joinAlgorithm = new LeftOuterJoin();
-break;
-default:
-throw new RuntimeException("Unsupported JoinType: " + 
joinBatch.getJoinType());
+// create aggregation state manager is aggregation state metadata 
exists
--- End diff --

if, not is


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143870656
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/AggregationStateManager.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.query.CommonNodeMetadataImpl;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Class that performs aggregation state lookups while maintaining a cache
+ * of old lookups.  This class uses a single transaction to perform a 
number
+ * of aggregation state lookups.
+ */
+public class AggregationStateManager {
+
+private String nodeId;
+private VariableOrder varOrder;
+private TransactionBase tx;
+private Map aggregationStateMap = new HashMap<>();
--- End diff --

Little worried that there is no eviction policy on this map within the 
single transaction. If this has a max size though, that would alleviate some of 
that.


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869294
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java
 ---
@@ -271,6 +275,20 @@ public Builder setUnit(TimeUnit unit) {
 }
 
 /**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Docs.


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869338
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ProjectionMetadata.java
 ---
@@ -225,12 +232,26 @@ public Builder setProjectedVars(VariableOrder 
projectedVars) {
 public VariableOrder getProjectionVars() {
 return projectedVars;
 }
+
+/**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143870730
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/AggregationStateManager.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.fluo.app.query.CommonNodeMetadataImpl;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Class that performs aggregation state lookups while maintaining a cache
+ * of old lookups.  This class uses a single transaction to perform a 
number
+ * of aggregation state lookups.
+ */
+public class AggregationStateManager {
+
+private String nodeId;
+private VariableOrder varOrder;
+private TransactionBase tx;
+private Map aggregationStateMap = new HashMap<>();
--- End diff --

Maybe do Least Recently Used.


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143867818
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadataImpl.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Implementation of {@link CommonNodeMetadata} for storing metadata 
nodeIds and
+ * {@link VariableOrder}s.  This class is used primarily for storing 
Aggregation State 
+ * Metadata.
+ */
+public class CommonNodeMetadataImpl extends CommonNodeMetadata {
+
+public CommonNodeMetadataImpl(String nodeId, VariableOrder varOrder) {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143867158
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 ---
@@ -1,4 +1,6 @@
 package org.apache.rya.indexing.pcj.fluo.app.batch;
+import static jline.internal.Preconditions.checkNotNull;
--- End diff --

This is probably supposed to be a different impl of this method.


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143868002
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadataImpl.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Implementation of {@link CommonNodeMetadata} for storing metadata 
nodeIds and
+ * {@link VariableOrder}s.  This class is used primarily for storing 
Aggregation State 
+ * Metadata.
+ */
+public class CommonNodeMetadataImpl extends CommonNodeMetadata {
+
+public CommonNodeMetadataImpl(String nodeId, VariableOrder varOrder) {
+super(nodeId, varOrder);
+}
+
+public CommonNodeMetadataImpl(CommonNodeMetadataImpl metadata) {
+super(metadata.getNodeId(), metadata.getVariableOrder());
+}
+
+/**
+ * Reads/Writes instances of {@link CommonNodeMetadataImpl} to/from 
Json Strings.
+ */
+public static class CommonNodeMetadataSerDe {
+
+private static Logger log = 
Logger.getLogger(CommonNodeMetadataSerDe.class);
+private static Gson gson = new 
GsonBuilder().registerTypeAdapter(CommonNodeMetadataImpl.class, new 
CommonNodeTypeAdapter())
+.create();
+
+public static String serialize(CommonNodeMetadataImpl metadata) {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143867713
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadata.java
 ---
@@ -65,6 +67,14 @@ public String getNodeId() {
 public VariableOrder getVariableOrder() {
 return varOrder;
 }
+
+/**
+ * Sets the VariableOrder for this node. Allows the VariableOrder to 
be updated.
+ * @param varOrder - VariableOrder for this metadata node.
+ */
+public void setVariableOrder(VariableOrder varOrder) {
--- End diff --

@Nullable


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143867467
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/JoinBatchInformationTypeAdapter.java
 ---
@@ -60,6 +61,10 @@ public JsonElement serialize(JoinBatchInformation batch, 
Type typeOfSrc, JsonSer
 result.add("span", new JsonPrimitive(span.getStart().getsRow() + 
"\u" + span.getEnd().getsRow()));
 result.add("startInc", new JsonPrimitive(span.isStartInclusive()));
 result.add("endInc", new JsonPrimitive(span.isEndInclusive()));
+if(batch.getAggregationStateMeta().isPresent()) {
+CommonNodeMetadataImpl stateMeta = 
batch.getAggregationStateMeta().get();
+result.add("aggStateMeta", new 
JsonPrimitive(stateMeta.getNodeId() + "\u" + 
stateMeta.getVariableOrder().toString()));
--- End diff --

Maybe move the null character somewhere common.


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869517
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StateNodeMetadata.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Optional;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+/**
+ * Metadata class that is meant to be extended by any metadata node that 
could have Aggregation State.
+ *
+ */
+public class StateNodeMetadata extends CommonNodeMetadata {
+
+private Optional stateMetadata;
+
+/**
+ * Creates a StateNodeMetadata object with empty aggregation metadata.
+ * @param nodeId 
+ * @param varOrder
+ */
+public StateNodeMetadata(String nodeId, VariableOrder varOrder) {
+this(nodeId, varOrder, Optional.empty());
+}
+
+/**
+ * Creates a StateNodeMetadata object with indicated aggregation 
metadata.
+ * @param nodeId
--- End diff --

docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143872059
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/StopNodeVisitor.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryBuilderVisitorBase;
+
+/**
+ *  Base class that enables all visitors extending this class to stop 
traversing
+ *  the Builder tree at a prescribed node.  Any visitor extending this 
class will
+ *  traverse the visitor tree and process all nodes up to and including 
the prescribed stop
+ *  node.  
+ */
+public abstract class StopNodeVisitor extends QueryBuilderVisitorBase {
+
+private String stopNodeId;
+private boolean processedStopNode = false;
+
+public StopNodeVisitor(FluoQuery.Builder fluoBuilder,String nodeId) {
+super(fluoBuilder);
+this.stopNodeId = checkNotNull(nodeId);
+}
+
+@Override
+public void visitNode(String nodeId) {
+//process the stop node, then stop traversing
+if(!processedStopNode) {
+processedStopNode = atStopNode(nodeId);
+super.visitNode(nodeId);
+}
+}
+
+boolean atStopNode(String nodeId) {
--- End diff --

docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869259
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/JoinMetadata.java
 ---
@@ -224,6 +231,20 @@ public VariableOrder getVariableOrder() {
 }
 
 /**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869087
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FilterMetadata.java
 ---
@@ -221,6 +228,20 @@ public Builder setChildNodeId(@Nullable final String 
childNodeId) {
 public String getChildNodeId() {
 return childNodeId;
 }
+
+/**
+ * Sets the Aggregation State for this Filter node.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869370
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/QueryMetadata.java
 ---
@@ -215,6 +220,20 @@ public Builder setVarOrder(@Nullable final 
VariableOrder varOrder) {
 public VariableOrder getVariableOrder() {
 return varOrder;
 }
+
+/**
+ * Sets the Aggregation State.
+ * @param state - Aggregation State indicating current value of 
Aggregation 
+ * @return This builder so that method invocations may be chained. 
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl state) {
+this.state = state;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869465
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StateNodeMetadata.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Optional;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+/**
+ * Metadata class that is meant to be extended by any metadata node that 
could have Aggregation State.
+ *
+ */
+public class StateNodeMetadata extends CommonNodeMetadata {
+
+private Optional stateMetadata;
+
+/**
+ * Creates a StateNodeMetadata object with empty aggregation metadata.
+ * @param nodeId 
--- End diff --

docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143868033
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadataImpl.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Implementation of {@link CommonNodeMetadata} for storing metadata 
nodeIds and
+ * {@link VariableOrder}s.  This class is used primarily for storing 
Aggregation State 
+ * Metadata.
+ */
+public class CommonNodeMetadataImpl extends CommonNodeMetadata {
+
+public CommonNodeMetadataImpl(String nodeId, VariableOrder varOrder) {
+super(nodeId, varOrder);
+}
+
+public CommonNodeMetadataImpl(CommonNodeMetadataImpl metadata) {
+super(metadata.getNodeId(), metadata.getVariableOrder());
+}
+
+/**
+ * Reads/Writes instances of {@link CommonNodeMetadataImpl} to/from 
Json Strings.
+ */
+public static class CommonNodeMetadataSerDe {
+
+private static Logger log = 
Logger.getLogger(CommonNodeMetadataSerDe.class);
+private static Gson gson = new 
GsonBuilder().registerTypeAdapter(CommonNodeMetadataImpl.class, new 
CommonNodeTypeAdapter())
+.create();
+
+public static String serialize(CommonNodeMetadataImpl metadata) {
+try {
+return gson.toJson(metadata);
+} catch (Exception e) {
+log.info("Unable to serialize BatchInformation: " + 
metadata);
+throw new RuntimeException(e);
+}
+}
+
+public static Optional deserialize(String 
json) {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143868841
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
 ---
@@ -199,13 +206,27 @@ public Builder setParentNodeId(String parentNodeId) {
 this.parentNodeId = parentNodeId;
 return this;
 }
+
+/**
+ * Sets the state metadata of this {@link ConstructQueryMetadata}.
+ * @param stateMetadata
--- End diff --

docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143869627
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StateNodeMetadata.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.Optional;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Objects;
+
+/**
+ * Metadata class that is meant to be extended by any metadata node that 
could have Aggregation State.
+ *
+ */
+public class StateNodeMetadata extends CommonNodeMetadata {
+
+private Optional stateMetadata;
+
+/**
+ * Creates a StateNodeMetadata object with empty aggregation metadata.
+ * @param nodeId 
+ * @param varOrder
+ */
+public StateNodeMetadata(String nodeId, VariableOrder varOrder) {
+this(nodeId, varOrder, Optional.empty());
+}
+
+/**
+ * Creates a StateNodeMetadata object with indicated aggregation 
metadata.
+ * @param nodeId
+ * @param varOrder 
+ * @param aggregationStateMetadata - aggregation metadata to look up 
aggregation state
+ */
+public StateNodeMetadata(String nodeId, VariableOrder varOrder, 
Optional aggregationStateMetadata) {
+super(nodeId, varOrder);
+this.stateMetadata = aggregationStateMetadata;
+}
+
+/**
+ * @return - Optional containing aggregation metadata to look up 
aggregation state (if it exists)
+ */
+public Optional getStateMetadata() {
+return stateMetadata;
+}
+
+@Override
+public int hashCode() {
+return Objects.hashCode(super.getNodeId(), 
super.getVariableOrder(), stateMetadata);
+}
+
+@Override
+public boolean equals(final Object o) {
+if (this == o) {
+return true;
+}
+
+if (o instanceof StateNodeMetadata) {
+if (super.equals(o)) {
+final StateNodeMetadata metadata = (StateNodeMetadata) o;
+return new EqualsBuilder().append(stateMetadata, 
metadata.stateMetadata).isEquals();
--- End diff --

This could be Objects.equals(stateMetadata, metadata.stateMetadata); It 
requires less object construction.


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143870861
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/AggregationStateMetadataVisitor.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static jline.internal.Preconditions.checkNotNull;
+
+import org.apache.rya.indexing.pcj.fluo.app.query.CommonNodeMetadataImpl;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+
+/**
+ *  This builder visitor inserts the aggregation metadata into all of its
+ *  ancestors.
+ */
+public class AggregationStateMetadataVisitor extends StopNodeVisitor {
+
+private CommonNodeMetadataImpl aggStateMeta;
+
+public AggregationStateMetadataVisitor(FluoQuery.Builder 
fluoQueryBuilder, String stopNodeId, CommonNodeMetadataImpl aggStateMeta) {
+super(fluoQueryBuilder, stopNodeId);
+this.aggStateMeta = checkNotNull(aggStateMeta);
+}
+
+public void visit(QueryMetadata.Builder builder) {
--- End diff --

Should all of these be annotated with @Override?


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143868867
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
 ---
@@ -199,13 +206,27 @@ public Builder setParentNodeId(String parentNodeId) {
 this.parentNodeId = parentNodeId;
 return this;
 }
+
+/**
+ * Sets the state metadata of this {@link ConstructQueryMetadata}.
+ * @param stateMetadata
+ * @return This builder so that method invocations may be chained.
+ */
+public Builder setStateMetadata(CommonNodeMetadataImpl 
stateMetadata) {
+this.state = stateMetadata;
+return this;
+}
+
+public Optional getStateMetadata() {
--- End diff --

docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-10 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/240#discussion_r143867848
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/CommonNodeMetadataImpl.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.lang.reflect.Type;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * Implementation of {@link CommonNodeMetadata} for storing metadata 
nodeIds and
+ * {@link VariableOrder}s.  This class is used primarily for storing 
Aggregation State 
+ * Metadata.
+ */
+public class CommonNodeMetadataImpl extends CommonNodeMetadata {
+
+public CommonNodeMetadataImpl(String nodeId, VariableOrder varOrder) {
+super(nodeId, varOrder);
+}
+
+public CommonNodeMetadataImpl(CommonNodeMetadataImpl metadata) {
--- End diff --

Docs


---


[GitHub] incubator-rya pull request #240: Rya 374

2017-10-09 Thread meiercaleb
GitHub user meiercaleb opened a pull request:

https://github.com/apache/incubator-rya/pull/240

Rya 374


## Description
Added Aggregation State to Fluo Query Nodes so that any node that is 
dependent on Aggregation
Values can check the Aggregation state before writing to make sure that the 
state has not changed.  This helps provide the guarantee that any result 
emitted by Fluo that contains an Aggregation Value reflects the most up to date 
Aggregation state.

### Tests
Integration tests were written to verify that Aggregation state was being 
verified before writing and that the results emitted to Kafka reflected the 
current Aggregation state.

### Links
[Jira](https://issues.apache.org/jira/browse/RYA-372)

### Checklist
- [ ] Code Review
- [ ] Squash Commits

 People To Reivew
@kchilton2 @jdasch 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/meiercaleb/incubator-rya RYA-374

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #240


commit 07d52cbf5a6a9028aac45ed7b68d933419ffe8a9
Author: Caleb Meier 
Date:   2017-09-21T14:47:47Z

RYA-374-Aggregation Value Join Bug Fix

commit 003045c80429f8ec08c45378cfc0dd56be2e1084
Author: Caleb Meier 
Date:   2017-10-09T15:03:49Z

Broke KafkaExport IT in two




---