[ 
https://issues.apache.org/jira/browse/STORM-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz resolved STORM-2334.
------------------------------------
    Resolution: Fixed

> Bolt for Joining streams
> ------------------------
>
>                 Key: STORM-2334
>                 URL: https://issues.apache.org/jira/browse/STORM-2334
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 1.x
>            Reporter: Roshan Naik
>            Assignee: Roshan Naik
>             Fix For: 2.0.0, 1.1.0
>
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Create a general purpose windowed bolt that performs Joins on multiple data 
> streams.
> Since, depending on the topo config,  the bolt could be receiving data either 
> on 'default' streams or on named streams .... join bolt should be able to 
> differentiate the incoming data based on names of upstream components as well 
> as stream names.
> *Example:*
> The following SQL style join involving 4 tables :
> {code}
> select  userId, key4, key2, key3
> from stream1 
> join       stream2  on stream2.userId =  stream1.key1
> join       stream3  on stream3.key3   =  stream2.userId
> left join  stream4  on stream4.key4   =  stream3.key3
> {code}
> Could be expressed using the Join Bolt over 4 named streams as :
> {code}
> new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that 
> stream1/2/3/4 are names of streams. 'key1' is the key on which 
>      .join     ("stream2", "userId",  "stream1") //join stream2 on 
> stream2.userId=stream1.key1
>      .join     ("stream3", "key3",    "stream2") //join stream3 on 
> stream3.key3=stream2.userId   
>      .leftjoin ("stream4", "key4",    "stream3") //left join stream4 on 
> stream4.key4=stream3.key3
>      .select("userId, key4, key2, key3")         // chose output fields
>      .withWindowLength(..)
>      .withSlidingInterval(..);
> {code}
> Or based on named source components :
> {code}
> new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that 
> kafkaSpout1, hdfsSpout3 etc are names of upstream components 
>      .join     ("kafkaSpout2", "userId",    "kafkaSpout1" )    
>      .join     ("hdfsSpout3",  "key3",      "kafkaSpout2")
>      .leftjoin ("mqttSpout1",  "key4",      "hdfsSpout3")
>      .select ("userId, key4, key2, key3")
>      .withWindowLength(..)
>      .withSlidingInterval(..);
> {code}
> In order for the tuples to  be joined correctly, 'fields grouping' should be 
> employed on the incoming streams. Each stream should be grouped on the same 
> key using which it will be joined against other streams.  This is a 
> restriction compared to SQL which allows join a table with others on any key 
> and any number of keys.
> *For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a 
> different 'key2' on 'Stream1' to join it with other streams. However, 
> 'Stream1' can be joined using the same key with multiple other streams as 
> show in this SQL.
> {code}
> select ....
> from stream1 
> join  stream2  on stream2.userId =  stream1.key1
> join  stream3  on stream3.key3   =  stream1.key2  // not supportable in Join 
> Bolt 
> {code}
> Consequently the join bolt's syntax is a bit simplified compared to SQL. The 
> key name for any given stream only appears once, as soon the stream is 
> introduced for the first time in the join. Thereafter that key is implicitly 
> used for joining. See the case of 'stream3' being joined with both 'stream2' 
> and 'stream4' in the first example.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to