This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
commit 4aa381224658c5ca92431e31cb10220a5a50339c Author: Munagala V. Ramanath <amberar...@users.noreply.github.com> AuthorDate: Tue Feb 23 05:58:46 2016 -0800 Use algo/UniqueCounter to reduce code --- .../java/com/example/myapexapp/Application.java | 37 ++++------------------ 1 file changed, 7 insertions(+), 30 deletions(-) diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java index 8050d67..7700d68 100644 --- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java +++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java @@ -16,6 +16,7 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; import com.datatorrent.lib.io.ConsoleOutputOperator; @@ -32,12 +33,12 @@ public class Application implements StreamingApplication { KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator()); kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); - UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>()); + UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat()); CountStoreOperator store = dag.addOperator("store", new CountStoreOperator()); store.setStore(new JdbcTransactionalStore()); ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator()); dag.addStream("words", kafkaInput.outputPort, count.data); - dag.addStream("counts", count.count, store.input, cons.input); + dag.addStream("counts", count.counts, store.input, cons.input); } public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>> @@ -62,42 +63,18 @@ public class Application implements StreamingApplication } } - public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K> + public static class UniqueCounterFlat extends UniqueCounter<String> { - /** - * The input port which receives incoming tuples. - */ - public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() - { - /** - * Reference counts tuples - */ - @Override - public void process(K tuple) - { - processTuple(tuple); - } - - }; - - public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>() - { - @Override - public Unifier<KeyValPair<K, Integer>> getUnifier() - { - throw new UnsupportedOperationException("not partitionable"); - } - }; + public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>(); @Override public void endWindow() { - for (Map.Entry<K, MutableInt> e: map.entrySet()) { - count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger())); + for (Map.Entry<String, MutableInt> e: map.entrySet()) { + counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger())); } map.clear(); } - } } -- To stop receiving notification emails like this one, please contact "commits@apex.apache.org" <commits@apex.apache.org>.