Hi Yifei, You could do a fields grouping on the join field (say id) so tuples with same id ends up in the same bolt task. ie. something like,
topologyBuilder.setBolt(“bolt”, windowedBolt, parallelism_hint).fieldsGrouping(“spout1”, new Fields(“id”)).fieldsGrouping(“spout2”, new Fields(“id”)); You will have to do the join within the windowedBolt’s execute where you will receive the last 3 second tuples from both the spouts every 3 seconds. Thanks, Arun From: Yifei Li Reply-To: "[email protected]<mailto:[email protected]>" Date: Tuesday, April 26, 2016 at 12:17 AM To: "[email protected]<mailto:[email protected]>" Subject: Storm join two streams based on event timestamp Hi, I am pretty new to Storm and I know that Storm now supports windowing based on event timestamp. I am wondering if it is possible to do the following join. 1. I have Spout1 which will emit tuple with timestamp. 2. I have Spout2 which will emit tuple with timestamp. 3. I have a bolt that accepts both Spout1 and Spout2 and process tuples from Spout1 and Spout2 based on the event time window. For example, (First is id, second is timestamp) Spout1(emits every second): (1, 10:11:12), (1, 10:11:13), (2, 10:11:14), (1, 10:11:15), (2, 10:11:16)...... Spout2(emits every second): : (2, 10:11:11), (1, 10:11:12), (3, 10:11:13), (2, 10:11:14), (1, 10:11:15), (2, 10:11:16)...... For bolt, I set window to 3 seconds, interval to 3 seconds. What I hope is that all the events(for both Spout1 and Spout 2) that happend within (10:11:10 - 10:11:13) (10:11:14 - 10:11:16) ...... will be sent to the bolt so that within each window, I can join two stream by the Id and count number of same Ids within each time window.. Is it possible to do that? If yes, can you point me to some example about how to do this? I tried it on my local machine. I can do that for one stream. But when I have two stream, I got exceptions.. Any suggestion/ideas will be appreciated. Thanks, Yifei
