Hi Vivek, The deduper assumes a binding of the dedup key with the timestamp (expiry key) in case of dedup with expiry. See https://apex.apache.org/docs/malhar/operators/deduper/#assumptions for more details.
~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhup...@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Fri, Mar 16, 2018 at 2:35 AM, Vivek Bhide <vivek.bh...@target.com> wrote: > Hi, > > I'm trying to understand working of TimeBasedDedupOperator for my streaming > application. I'm using the example shown in Malhar dedup example: > https://github.com/apache/apex-malhar/blob/master/ > examples/dedup/src/main/java/org/apache/apex/examples/ > dedup/Application.java > > I made few modifications to minimize the output. > Properties: > <property> > > <name>dt.application.DedupExample.operator.Deduper. > prop.keyExpression</name> > <value>id</value> > </property> > <property> > > <name>dt.application.DedupExample.operator.Deduper. > prop.timeExpression</name> > <value>eventTime.getTime()</value> > </property> > <property> > > <name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name> > <value>10</value> > </property> > <property> > > <name>dt.application.DedupExample.operator.Deduper. > prop.expireBefore</name> > <value>60</value> > </property> > > Below is Application code: > > public class Application implements StreamingApplication > { > > @Override > public void populateDAG(DAG dag, Configuration conf) > { > // Test Data Generator Operator > RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", > new > RandomDataGeneratorOperator()); > > // Dedup Operator. Configuration through > resources/META-INF/properties.xml > TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new > TimeBasedDedupOperator()); > > // Console output operator for unique tuples > ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", > new ConsoleOutputOperator()); > > // Streams > dag.addStream("Generator to Dedup", gen.output, dedup.input); > > // Connect Dedup unique to Console > dag.addStream("Dedup Unique to Console", dedup.unique, > consoleUnique.input); > // Set Attribute TUPLE_CLASS for supplying schema information to the > port > dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_ > CLASS, > TestEvent.class); > } > > public static class RandomDataGeneratorOperator extends BaseOperator > implements InputOperator > { > > public final transient DefaultOutputPort<TestEvent> output = new > DefaultOutputPort<>(); > private final transient Random r = new Random(); > private int tuplesPerWindow = 100; > private transient int count = 0; > > @Override > public void beginWindow(long windowId) > { > count = 0; > } > > @Override > public void emitTuples() > { > if (count++ > tuplesPerWindow) { > return; > } > TestEvent event = new TestEvent(); > event.id = r.nextInt(2); > long millis = System.currentTimeMillis(); > event.millis = millis; > event.setTimeNow(new Date(millis)); > // event.eventTime = new Date( millis - (r.nextInt(60 * 1000))); > event.eventTime = new Date(millis); > output.emit(event); > } > } > > public static class TestEvent > { > private int id; > private Date timeNow; > private Date eventTime; > private long millis; > > public TestEvent() > { > } > public long getMillis() { return millis; } > > public int getId() > { > return id; > } > > public void setId(int id) > { > this.id = id; > } > > public Date getEventTime() > { > return eventTime; > } > > public void setTimeNow(Date timeNow) { > this.timeNow = timeNow; > } > > public Date getTimeNow() { > return timeNow; > } > > public void setEventTime(Date eventTime) > { > this.eventTime = eventTime; > } > > @Override > public String toString() > { > return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" > + > timeNow + "; eventTime=" + eventTime + "]"; > } > > } > > } > > I executed this application using JUnit test using LocalMode. But, in the > console output I see duplicate records. I'm trying to understand the reason > behind the duplication message appearing in unique console: > 1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07 > 00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018] > 10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07 > 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] > 11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07 > 00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018] > 12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07 > 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] > 13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07 > 00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018] > > I see lot of duplicates in unique port. Did I set any configuration wrong? > > Any suggestions are appreciated. > > Thanks > > > > -- > Sent from: http://apache-apex-users-list.78494.x6.nabble.com/ >