Hi Vivek,

The deduper assumes a binding of the dedup key with the timestamp (expiry
key) in case of dedup with expiry.
See https://apex.apache.org/docs/malhar/operators/deduper/#assumptions for
more details.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 16, 2018 at 2:35 AM, Vivek Bhide <vivek.bh...@target.com> wrote:

> Hi,
>
> I'm trying to understand working of TimeBasedDedupOperator for my streaming
> application. I'm using the example shown in Malhar dedup example:
> https://github.com/apache/apex-malhar/blob/master/
> examples/dedup/src/main/java/org/apache/apex/examples/
> dedup/Application.java
>
> I made few modifications to minimize the output.
> Properties:
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.keyExpression</name>
>     <value>id</value>
>   </property>
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.timeExpression</name>
>     <value>eventTime.getTime()</value>
>   </property>
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
>     <value>10</value>
>   </property>
>   <property>
>
> <name>dt.application.DedupExample.operator.Deduper.
> prop.expireBefore</name>
>     <value>60</value>
>   </property>
>
> Below is Application code:
>
> public class Application implements StreamingApplication
> {
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf)
>   {
>     // Test Data Generator Operator
>     RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator",
> new
> RandomDataGeneratorOperator());
>
>     // Dedup Operator. Configuration through
> resources/META-INF/properties.xml
>     TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
> TimeBasedDedupOperator());
>
>     // Console output operator for unique tuples
>     ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
> new ConsoleOutputOperator());
>
>     // Streams
>     dag.addStream("Generator to Dedup", gen.output, dedup.input);
>
>     // Connect Dedup unique to Console
>     dag.addStream("Dedup Unique to Console", dedup.unique,
> consoleUnique.input);
>     // Set Attribute TUPLE_CLASS for supplying schema information to the
> port
>     dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_
> CLASS,
> TestEvent.class);
> }
>
>   public static class RandomDataGeneratorOperator extends BaseOperator
> implements InputOperator
>   {
>
>     public final transient DefaultOutputPort<TestEvent> output = new
> DefaultOutputPort<>();
>     private final transient Random r = new Random();
>     private int tuplesPerWindow = 100;
>     private transient int count = 0;
>
>     @Override
>     public void beginWindow(long windowId)
>     {
>       count = 0;
>     }
>
>     @Override
>     public void emitTuples()
>     {
>       if (count++ > tuplesPerWindow) {
>         return;
>       }
>       TestEvent event = new TestEvent();
>       event.id = r.nextInt(2);
>       long millis = System.currentTimeMillis();
>       event.millis = millis;
>       event.setTimeNow(new Date(millis));
> //      event.eventTime = new Date( millis - (r.nextInt(60 * 1000)));
>       event.eventTime = new Date(millis);
>       output.emit(event);
>     }
>   }
>
>   public static class TestEvent
>   {
>     private int id;
>     private Date timeNow;
>     private Date eventTime;
>     private long millis;
>
>     public TestEvent()
>     {
>     }
>     public long getMillis() { return millis; }
>
>     public int getId()
>     {
>       return id;
>     }
>
>     public void setId(int id)
>     {
>       this.id = id;
>     }
>
>     public Date getEventTime()
>     {
>       return eventTime;
>     }
>
>     public void setTimeNow(Date timeNow) {
>       this.timeNow = timeNow;
>     }
>
>     public Date getTimeNow() {
>       return timeNow;
>     }
>
>     public void setEventTime(Date eventTime)
>     {
>       this.eventTime = eventTime;
>     }
>
>     @Override
>     public String toString()
>     {
>       return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime="
> +
> timeNow + "; eventTime=" + eventTime + "]";
>     }
>
>   }
>
> }
>
> I executed this application using JUnit test using LocalMode. But, in the
> console output I see duplicate records. I'm trying to understand the reason
> behind the duplication message appearing in unique console:
> 1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
> 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
> 10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
> 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
> 11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
> 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
> 12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
> 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
> 13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
> 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
>
> I see lot of duplicates in unique port. Did I set any configuration wrong?
>
> Any suggestions are appreciated.
>
> Thanks
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>

Reply via email to