This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git

commit 4aa381224658c5ca92431e31cb10220a5a50339c
Author: Munagala V. Ramanath <amberar...@users.noreply.github.com>
AuthorDate: Tue Feb 23 05:58:46 2016 -0800

    Use algo/UniqueCounter to reduce code
---
 .../java/com/example/myapexapp/Application.java    | 37 ++++------------------
 1 file changed, 7 insertions(+), 30 deletions(-)

diff --git 
a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java 
b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
index 8050d67..7700d68 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -16,6 +16,7 @@ import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
 import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -32,12 +33,12 @@ public class Application implements StreamingApplication
   {
     KafkaSinglePortStringInputOperator kafkaInput = 
dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
     kafkaInput.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
-    UniqueCounter<String> count = dag.addOperator("count", new 
UniqueCounter<String>());
+    UniqueCounterFlat count = dag.addOperator("count", new 
UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new 
CountStoreOperator());
     store.setStore(new JdbcTransactionalStore());
     ConsoleOutputOperator cons = dag.addOperator("console", new 
ConsoleOutputOperator());
     dag.addStream("words", kafkaInput.outputPort, count.data);
-    dag.addStream("counts", count.count, store.input, cons.input);
+    dag.addStream("counts", count.counts, store.input, cons.input);
   }
 
   public static class CountStoreOperator extends 
AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
@@ -62,42 +63,18 @@ public class Application implements StreamingApplication
     }
   }
 
-  public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+  public static class UniqueCounterFlat extends UniqueCounter<String>
   {
-    /**
-     * The input port which receives incoming tuples.
-     */
-    public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-    {
-      /**
-       * Reference counts tuples
-       */
-      @Override
-      public void process(K tuple)
-      {
-        processTuple(tuple);
-      }
-
-    };
-
-    public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = 
new DefaultOutputPort<KeyValPair<K, Integer>>()
-    {
-      @Override
-      public Unifier<KeyValPair<K, Integer>> getUnifier()
-      {
-        throw new UnsupportedOperationException("not partitionable");
-      }
-    };
+    public final transient DefaultOutputPort<KeyValPair<String, Integer>> 
counts = new DefaultOutputPort<>();
 
     @Override
     public void endWindow()
     {
-      for (Map.Entry<K, MutableInt> e: map.entrySet()) {
-        count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+      for (Map.Entry<String, MutableInt> e: map.entrySet()) {
+        counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
       }
       map.clear();
     }
-
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@apex.apache.org" <commits@apex.apache.org>.

Reply via email to