Hi Yifei,

You could do a fields grouping on the join field (say id) so tuples with same 
id ends up in the same bolt task. ie. something like,

topologyBuilder.setBolt(“bolt”, windowedBolt, 
parallelism_hint).fieldsGrouping(“spout1”, new 
Fields(“id”)).fieldsGrouping(“spout2”, new Fields(“id”));

You will have to do the join within the windowedBolt’s execute where you will 
receive the last 3 second tuples from both the spouts every 3 seconds.

Thanks,
Arun

From: Yifei Li
Reply-To: "[email protected]<mailto:[email protected]>"
Date: Tuesday, April 26, 2016 at 12:17 AM
To: "[email protected]<mailto:[email protected]>"
Subject: Storm join two streams based on event timestamp

Hi,

I am pretty new to Storm and I know that Storm now supports windowing based on 
event timestamp. I am wondering if it is possible to do the following join.

1. I have Spout1 which will emit tuple with timestamp.
2. I have Spout2 which will emit tuple with timestamp.
3. I have a bolt that accepts both Spout1 and Spout2 and process tuples from 
Spout1 and Spout2 based on the event time window.

For example,
(First is id, second is timestamp)

Spout1(emits every second): (1, 10:11:12), (1, 10:11:13), (2, 10:11:14), (1, 
10:11:15), (2, 10:11:16)......

Spout2(emits every second): :  (2, 10:11:11), (1, 10:11:12), (3, 10:11:13), (2, 
10:11:14), (1, 10:11:15), (2, 10:11:16)......

For bolt, I set window to 3 seconds, interval to 3 seconds.

What I hope is that all the events(for both Spout1 and Spout 2) that happend 
within
(10:11:10  - 10:11:13)
(10:11:14  - 10:11:16)
......

will be sent to the bolt so that within each window, I can join two stream by 
the Id and count number of same Ids within each time window..

Is it possible to do that? If yes, can you point me to some example about how 
to do this? I tried it on my local machine. I can do that for one stream. But 
when I have two stream, I got exceptions..

Any suggestion/ideas will be appreciated.

Thanks,

Yifei


Reply via email to