RYA-377 Implement the Aggregation Processor for Rya Streams.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/95df37a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/95df37a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/95df37a3 Branch: refs/heads/master Commit: 95df37a30ff6e4f2e70a863872e1087a447de47f Parents: 8363724 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Tue Nov 21 16:53:38 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../aggregation/AggregationStateStore.java | 49 ++ .../aggregation/AggregationsEvaluator.java | 175 +++++++ .../AggregationProcessorSupplier.java | 158 +++++++ .../KeyValueAggregationStateStore.java | 104 +++++ .../streams/kafka/topology/TopologyFactory.java | 24 +- .../aggregation/AggregationProcessorIT.java | 457 +++++++++++++++++++ 6 files changed, 962 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java new file mode 100644 index 0000000..d37f4c7 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationStateStore.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import java.util.Optional; + +import org.apache.rya.api.model.VisibilityBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides a mechanism for storing the updating {@link AggregationState} while using an {@link AggregationsEvaluator}. + */ +@DefaultAnnotation(NonNull.class) +public interface AggregationStateStore { + + /** + * Stores a state. If this value updates a previously stored state, then it will overwrite the old value + * with the new one. + * + * @param state - The state that will be stored. (not null) + */ + public void store(AggregationState state); + + /** + * Get the {@link AggregationState} that may be updatted using the provided binding set. + * + * @param bs - A binding set that defines which state to fetch. (not null) + * @return The {@link AggregationState} that is updated by the binding set, if one has been stored. + */ + public Optional<AggregationState> get(VisibilityBindingSet bs); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java new file mode 100644 index 0000000..2aa716f --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationsEvaluator.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.visibility.VisibilitySimplifier; +import org.openrdf.query.algebra.AggregateOperator; +import org.openrdf.query.algebra.Group; +import org.openrdf.query.algebra.GroupElem; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.ImmutableMap; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A stateful evaluator that processes aggregation functions over variables that are grouped together. + * </p> + * The following aggregation functions are supported: + * <ul> + * <li>Count</li> + * <li>Sum</li> + * <li>Average</li> + * <li>Min</li> + * <li>Max</li> + * </ul> + * </p> + * The persistence of the aggregation's state is determined by the provided {@link AggregationStateStore}. + */ +@DefaultAnnotation(NonNull.class) +public class AggregationsEvaluator { + + private static final ImmutableMap<AggregationType, AggregationFunction> FUNCTIONS; + static { + final ImmutableMap.Builder<AggregationType, AggregationFunction> builder = ImmutableMap.builder(); + builder.put(AggregationType.COUNT, new CountFunction()); + builder.put(AggregationType.SUM, new SumFunction()); + builder.put(AggregationType.AVERAGE, new AverageFunction()); + builder.put(AggregationType.MIN, new MinFunction()); + builder.put(AggregationType.MAX, new MaxFunction()); + FUNCTIONS = builder.build(); + } + + private final AggregationStateStore aggStateStore; + private final Collection<AggregationElement> aggregations; + private final List<String> groupByVars; + + /** + * Constructs an instance of {@link AggregationsEvaluator}. + * + * @param aggStateStore - The mechanism for storing aggregation state. (not null) + * @param aggregations - The aggregation functions that will be computed. (not null) + * @param groupByVars - The names of the binding whose values are used to group aggregation results. (not null) + */ + public AggregationsEvaluator( + final AggregationStateStore aggStateStore, + final Collection<AggregationElement> aggregations, + final List<String> groupByVars) { + this.aggStateStore = requireNonNull(aggStateStore); + this.aggregations = requireNonNull(aggregations); + this.groupByVars = requireNonNull(groupByVars); + } + + /** + * Make an instance of {@link AggregationsEvaluator} based on a {@link Group} node. + * + * @param aggStateStore - The mechanism for storing aggregation state. (not null) + * @param aggNode - Defines which aggregation functions need to be performed. + * @param groupByVars - The names of the binding whose values are used to group aggregation results. (not null) + * @return The evaluator that handles the node's aggregations. + */ + public static AggregationsEvaluator make(final AggregationStateStore aggStateStore, final Group aggNode, final List<String> groupByVars) { + requireNonNull(aggStateStore); + requireNonNull(aggNode); + requireNonNull(groupByVars); + + // The aggregations that need to be performed are the Group Elements. + final List<AggregationElement> aggregations = new ArrayList<>(); + for(final GroupElem groupElem : aggNode.getGroupElements()) { + // Figure out the type of the aggregation. + final AggregateOperator operator = groupElem.getOperator(); + final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() ); + + // If the type is one we support, create the AggregationElement. + if(type.isPresent()) { + final String resultBindingName = groupElem.getName(); + + final AtomicReference<String> aggregatedBindingName = new AtomicReference<>(); + groupElem.visitChildren(new QueryModelVisitorBase<RuntimeException>() { + @Override + public void meet(final Var node) { + aggregatedBindingName.set( node.getName() ); + } + }); + + aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) ); + } + } + + return new AggregationsEvaluator(aggStateStore, aggregations, groupByVars); + } + + /** + * Update the aggregation values using information found within {@code newBs}. + * + * @param newBs - A binding set whose values need to be incorporated within the aggregations. (not null) + * @return A binding set containing the updated aggregation values. + */ + public VisibilityBindingSet update(final VisibilityBindingSet newBs) { + requireNonNull(newBs); + + // Load the old state if one was previously stored; otherwise initialize the state. + final AggregationState state = aggStateStore.get(newBs).orElseGet(() -> { + // Initialize a new state. + final AggregationState newState = new AggregationState(); + + // If we have group by bindings, their values need to be added to the state's binding set. + final MapBindingSet bindingSet = newState.getBindingSet(); + for(final String groupByVar : groupByVars) { + bindingSet.addBinding( newBs.getBinding(groupByVar) ); + } + + return newState; + }); + + // Update the visibilities of the result binding set based on the new result's visibilities. + final String oldVisibility = state.getVisibility(); + final String updateVisibilities = VisibilitySimplifier.unionAndSimplify(oldVisibility, newBs.getVisibility()); + state.setVisibility(updateVisibilities); + + // Update the Aggregation State with each Aggregation function included within this group. + for(final AggregationElement aggregation : aggregations) { + final AggregationType type = aggregation.getAggregationType(); + final AggregationFunction function = FUNCTIONS.get(type); + if(function == null) { + throw new RuntimeException("Unrecognized aggregation function: " + type); + } + + function.update(aggregation, state, newBs); + } + + // Store the updated state. This will write on top of any old state that was present for the Group By values. + aggStateStore.store(state); + + // Return the updated binding set from the updated state. + return new VisibilityBindingSet(state.getBindingSet(), state.getVisibility()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java new file mode 100644 index 0000000..c8e1049 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorSupplier.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.util.Comparator; +import java.util.List; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.rya.api.function.aggregation.AggregationState; +import org.apache.rya.api.function.aggregation.AggregationStateStore; +import org.apache.rya.api.function.aggregation.AggregationsEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.openrdf.query.algebra.Group; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.internal.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link AggregationProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class AggregationProcessorSupplier extends RyaStreamsProcessorSupplier { + + private final String stateStoreName; + private final Group aggNode; + + /** + * Constructs an instance of {@link AggregationProcessorSupplier}. + * + * @param stateStoreName - The name of the state store the processor will use. (not null) + * @param aggNode - Defines which aggregations will be performed by the processor. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + */ + public AggregationProcessorSupplier( + final String stateStoreName, + final Group aggNode, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.stateStoreName = requireNonNull(stateStoreName); + this.aggNode = requireNonNull(aggNode); + } + + @Override + public Processor<Object, ProcessorResult> get() { + return new AggregationProcessor(stateStoreName, aggNode, super.getResultFactory()); + } + + /** + * Evaluates a {@link Group} node that contains a bunch of aggregations. Each aggregation will have a binding + * within the resulting binding sets that contains the aggregation value. + * + * @see AggregationsEvaluator + */ + @DefaultAnnotation(NonNull.class) + public static class AggregationProcessor extends RyaStreamsProcessor { + private static final Logger log = LoggerFactory.getLogger(AggregationProcessor.class); + + private final String stateStoreName; + private final Group aggNode; + + private ProcessorContext context; + private AggregationStateStore aggStateStore; + private AggregationsEvaluator evaluator; + + /** + * Constructs an instance of {@link AggregationProcessor}. + * + * @param stateStoreName - The name of the Kafka Streams state store that this processor will use. (not null) + * @param aggNode - The group by node that configures how the aggregations will be performed. (not null) + * @param resultFactory - The factory that will format this processor's final results for the downstream + * processor. (not null) + */ + public AggregationProcessor( + final String stateStoreName, + final Group aggNode, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.stateStoreName = requireNonNull(stateStoreName); + this.aggNode = requireNonNull(aggNode); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + + // Sort the group by vars so that they will be written to the state store in the same order every time. + final List<String> groupByVars = Lists.newArrayList(aggNode.getGroupBindingNames()); + groupByVars.sort(Comparator.naturalOrder()); + + // Get a reference to the state store that keeps track of aggregation state. + final KeyValueStore<String, AggregationState> stateStore = + (KeyValueStore<String, AggregationState>) context.getStateStore( stateStoreName ); + aggStateStore = new KeyValueAggregationStateStore(stateStore, groupByVars); + + // Create the aggregation evaluator. + evaluator = AggregationsEvaluator.make(aggStateStore, aggNode, groupByVars); + } + + @Override + public void process(final Object key, final ProcessorResult value) { + // Aggregations can only be unary. + if (value.getType() != ResultType.UNARY) { + throw new RuntimeException("The ProcessorResult to be processed must be Unary."); + } + + // Log the binding set that has been input. + log.debug("\nINPUT:\nBinding Set: {}", value.getUnary().getResult()); + + // Update the aggregations values. + final VisibilityBindingSet resultBs = evaluator.update(value.getUnary().getResult()); + + // Log the binding set that will be output. + log.debug("\nOUTPUT:\nBinding Set: {}", resultBs); + + // Forward to the updated aggregation binding set to the downstream processors. + context.forward(key, super.getResultFactory().make(resultBs)); + } + + @Override + public void punctuate(final long timestamp) { + // Do nothing. + } + + @Override + public void close() { + // Do nothing. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java new file mode 100644 index 0000000..3300590 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/aggregation/KeyValueAggregationStateStore.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.rya.api.function.aggregation.AggregationState; +import org.apache.rya.api.function.aggregation.AggregationStateStore; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Joiner; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link KeyValueStore} implementation of {@link AggregationStateStore}. + * </p> + * This is a key/value store, so we need to store the {@link AggregationState} for each set of group by values + * using a key that is composed with those values. We use the following pattern to accomplish this: + * <pre> + * [groupByVar1 value],[groupByVar2 value],...,[groupByVarN value] + * </pre> + */ +@DefaultAnnotation(NonNull.class) +public class KeyValueAggregationStateStore implements AggregationStateStore { + + private final KeyValueStore<String, AggregationState> store; + private final List<String> groupByVars; + + /** + * Constructs an instance of {@link KeyValueAggregationStateStore}. + * + * @param store - The state store that will be used. (not null) + * @param groupByVars - An ordered list of group by variable names. (not null) + */ + public KeyValueAggregationStateStore( + final KeyValueStore<String, AggregationState> store, + final List<String> groupByVars) { + this.store = requireNonNull(store); + this.groupByVars = requireNonNull(groupByVars); + } + + @Override + public void store(final AggregationState state) { + requireNonNull(state); + + // Aggregations group their states by their group by variables, so the key is the resulting binding + // set's values for the group by variables. + final String key = makeCommaDelimitedValues(groupByVars, state.getBindingSet()); + store.put(key, state); + } + + @Override + public Optional<AggregationState> get(final VisibilityBindingSet bs) { + requireNonNull(bs); + + final String key = makeCommaDelimitedValues(groupByVars, bs); + return Optional.ofNullable(store.get(key)); + } + + /** + * A utility function that helps construct the keys used by {@link KeyValueAggregationStateStore}. + * + * @param vars - Which variables within the binding set to use for the key's values. (not null) + * @param bindingSet - The binding set the key is being constructed from. (not null) + * @return A comma delimited list of the binding values, leading with the side. + */ + private static String makeCommaDelimitedValues(final List<String> vars, final BindingSet bindingSet) { + requireNonNull(vars); + requireNonNull(bindingSet); + + // Make a an ordered list of the binding set variables. + final List<String> values = new ArrayList<>(); + for(final String var : vars) { + values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" ); + } + + // Return a comma delimited list of those values. + return Joiner.on(",").join(values); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java index 426b041..4046e23 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java @@ -51,6 +51,7 @@ import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier; import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier; import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier; import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier; @@ -65,6 +66,7 @@ import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.BinaryTupleOperator; import org.openrdf.query.algebra.Extension; import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Group; import org.openrdf.query.algebra.Join; import org.openrdf.query.algebra.LeftJoin; import org.openrdf.query.algebra.MultiProjection; @@ -94,6 +96,7 @@ public class TopologyFactory implements TopologyBuilderFactory { private static final String JOIN_PREFIX = "JOIN_"; private static final String PROJECTION_PREFIX = "PROJECTION_"; private static final String FILTER_PREFIX = "FILTER_"; + private static final String AGGREGATION_PREFIX = "AGGREGATION_"; private static final String SINK = "SINK"; private List<ProcessorEntry> processorEntryList; @@ -141,14 +144,15 @@ public class TopologyFactory implements TopologyBuilderFactory { builder.addProcessor(entry.getID(), entry.getSupplier(), parentIDs); } - if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin) { + // Add a state store for any node type that requires one. + if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin || entry.getNode() instanceof Group) { // Add a state store for the join processor. final StateStoreSupplier joinStoreSupplier = Stores.create( entry.getID() ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .persistent() - .build(); + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .persistent() + .build(); builder.addStateStore(joinStoreSupplier, entry.getID()); } } @@ -459,6 +463,16 @@ public class TopologyFactory implements TopologyBuilderFactory { super.meet(node); } + @Override + public void meet(final Group node) throws TopologyBuilderException { + final String id = AGGREGATION_PREFIX + UUID.randomUUID(); + final Optional<Side> side = getSide(node); + final AggregationProcessorSupplier supplier = new AggregationProcessorSupplier(id, node, (result) -> getResult(side, result)); + entries.add( new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getArg())) ); + idMap.put(node, id); + super.meet(node); + } + /** * Gets the {@link Side} the current node in the visitor is on relative to the provided node. * @param node - The node used to determine the side of the current visitor node. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/95df37a3/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java new file mode 100644 index 0000000..ccf5c0c --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.aggregation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests {@link AggregationProcessor}. + */ +public class AggregationProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); + + @Test + public void count() throws Exception { + // A query that figures out how many books each person has. + final String sparql = + "SELECT ?person (count(?book) as ?bookCount) " + + "WHERE { " + + "?person <urn:hasBook> ?book " + + "} GROUP BY ?person"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "a")); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "a&b")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void sum() throws Exception { + // A query that figures out how much food each person has. + final String sparql = + "SELECT ?person (sum(?foodCount) as ?totalFood) " + + "WHERE { " + + "?person <urn:hasFoodType> ?food . " + + "?food <urn:count> ?foodCount . " + + "} GROUP BY ?person"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void average() throws Exception { + // A query that figures out the average age across all people. + final String sparql = + "SELECT (avg(?age) as ?avgAge) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void min() throws Exception { + // A query that figures out what the youngest age is across all people. + final String sparql = + "SELECT (min(?age) as ?youngest) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(13)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(7)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(5)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void max() throws Exception { + // A query that figures out what the oldest age is across all people. + final String sparql = + "SELECT (max(?age) as ?oldest) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("oldest", vf.createLiteral(13)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("oldest", vf.createLiteral(25)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void multipleGroupByVars() throws Exception { + // A query that contains more than one group by variable. + final String sparql = + "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " + + "WHERE {" + + "?employee <urn:worksAt> ?business . " + + "?business <urn:hasTimecardId> ?timecardId . " + + "?employee <urn:hasTimecardId> ?timecardId . " + + "?timecardId <urn:hours> ?hours . " + + "} GROUP BY ?business ?employee"; + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Alice")); + bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Alice")); + bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:CoffeeShop")); + bs.addBinding("employee", vf.createURI("urn:Alice")); + bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 4000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void multipleAggregations() throws Exception { + // A query that figures out what the youngest and oldest ages are across all people. + final String sparql = + "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(13)); + bs.addBinding("oldest", vf.createLiteral(13)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(13)); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(7)); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(5)); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(5)); + bs.addBinding("oldest", vf.createLiteral(25)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file