Hi Romeo, Thanks for reporting that. It’s a bug, and your approach for a fix is correct.
If you’d like, feel free to open a JIRA and optionally a pull request for a fix. Otherwise, I can take care of it. -Taylor On Jun 17, 2015, at 4:07 PM, Romeo Nocon <[email protected]> wrote: > Hi, > > I'm testing migrating over a topology I have to flux. The > > spout: > - id: "spout" > > bolts: > - id: "bolt_A" > className: "com.blah.boltA" > parallelism: 1 > - id: "bolt_B" > className: "com.blah.boltB" > parallelism: 1 > - id: "bolt_C" > className: "com.blah.boltC" > parallelism: 1 > - id: "bolt_D" > className: "com.blah.boltD" > parallelism: 1 > > streams: > - name: "" > from: "spout" > to: "bolt_A" > grouping: > type: SHUFFLE > - name: "A-->B" > from: "bolt_A" > to: "bolt_B" > grouping: > streamId: "forB" > - name: "A-->C" > from: "bolt_A" > to: "bolt_C" > grouping: > streamId: "forC" > - name: "B-->D" > from: "bolt_B" > to: "bolt_D" > - name: "C-->D" > from: "bolt_C" > to: "bolt_D" > > It builds something like below (imagine the arrow from A-> B, A-> C, > B->D, and C->D) > --------------------------------------------------------- > Bolt_B > Spout -> Bolt_A -> Bolt_D > Bolt_C > --------------------------------------------------------- > > I get an error below in FLUX. > > Exception in thread "main" java.lang.IllegalArgumentException: Bolt > has already been declared for id bolt_D > at > backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212) > at > backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139) > at > org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158) > at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94) > at org.apache.storm.flux.Flux.runCli(Flux.java:153) > at org.apache.storm.flux.Flux.main(Flux.java:98) > > Looking at the buildStreamDefinitions code in the FluxBuilder it > iterates through each of the defined streams then calls the > appropriate > > builder.setBolt(stream.getTo()...). > > Since I have two streams going to Bolt_D it ends up getting the error > above. Does someone have a patch or fix out there already? > > A possible fix is to cache the BoltDeclarer by getTo() id then skip > the builder.setBolt method so the code can continue setting the > different types of groupings on the rest of streams. Just a thought. > > Thanks, > Romeo
signature.asc
Description: Message signed with OpenPGP using GPGMail
