[ https://issues.apache.org/jira/browse/STORM-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
P. Taylor Goetz resolved STORM-2334. ------------------------------------ Resolution: Fixed > Bolt for Joining streams > ------------------------ > > Key: STORM-2334 > URL: https://issues.apache.org/jira/browse/STORM-2334 > Project: Apache Storm > Issue Type: Bug > Affects Versions: 2.0.0, 1.x > Reporter: Roshan Naik > Assignee: Roshan Naik > Fix For: 2.0.0, 1.1.0 > > Time Spent: 6h 10m > Remaining Estimate: 0h > > Create a general purpose windowed bolt that performs Joins on multiple data > streams. > Since, depending on the topo config, the bolt could be receiving data either > on 'default' streams or on named streams .... join bolt should be able to > differentiate the incoming data based on names of upstream components as well > as stream names. > *Example:* > The following SQL style join involving 4 tables : > {code} > select userId, key4, key2, key3 > from stream1 > join stream2 on stream2.userId = stream1.key1 > join stream3 on stream3.key3 = stream2.userId > left join stream4 on stream4.key4 = stream3.key3 > {code} > Could be expressed using the Join Bolt over 4 named streams as : > {code} > new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that > stream1/2/3/4 are names of streams. 'key1' is the key on which > .join ("stream2", "userId", "stream1") //join stream2 on > stream2.userId=stream1.key1 > .join ("stream3", "key3", "stream2") //join stream3 on > stream3.key3=stream2.userId > .leftjoin ("stream4", "key4", "stream3") //left join stream4 on > stream4.key4=stream3.key3 > .select("userId, key4, key2, key3") // chose output fields > .withWindowLength(..) > .withSlidingInterval(..); > {code} > Or based on named source components : > {code} > new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that > kafkaSpout1, hdfsSpout3 etc are names of upstream components > .join ("kafkaSpout2", "userId", "kafkaSpout1" ) > .join ("hdfsSpout3", "key3", "kafkaSpout2") > .leftjoin ("mqttSpout1", "key4", "hdfsSpout3") > .select ("userId, key4, key2, key3") > .withWindowLength(..) > .withSlidingInterval(..); > {code} > In order for the tuples to be joined correctly, 'fields grouping' should be > employed on the incoming streams. Each stream should be grouped on the same > key using which it will be joined against other streams. This is a > restriction compared to SQL which allows join a table with others on any key > and any number of keys. > *For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a > different 'key2' on 'Stream1' to join it with other streams. However, > 'Stream1' can be joined using the same key with multiple other streams as > show in this SQL. > {code} > select .... > from stream1 > join stream2 on stream2.userId = stream1.key1 > join stream3 on stream3.key3 = stream1.key2 // not supportable in Join > Bolt > {code} > Consequently the join bolt's syntax is a bit simplified compared to SQL. The > key name for any given stream only appears once, as soon the stream is > introduced for the first time in the join. Thereafter that key is implicitly > used for joining. See the case of 'stream3' being joined with both 'stream2' > and 'stream4' in the first example. -- This message was sent by Atlassian JIRA (v6.3.15#6346)