Hi,

I'm testing migrating over a topology I have to flux.  The

spout:
  - id: "spout"

bolts:
  - id: "bolt_A"
    className: "com.blah.boltA"
    parallelism: 1
  - id: "bolt_B"
    className: "com.blah.boltB"
    parallelism: 1
  - id: "bolt_C"
    className: "com.blah.boltC"
    parallelism: 1
  - id: "bolt_D"
    className: "com.blah.boltD"
    parallelism: 1

streams:
  - name:  ""
    from: "spout"
    to: "bolt_A"
    grouping:
      type: SHUFFLE
  - name: "A-->B"
     from: "bolt_A"
     to: "bolt_B"
     grouping:
       streamId: "forB"
  - name: "A-->C"
     from: "bolt_A"
     to: "bolt_C"
     grouping:
       streamId: "forC"
  - name: "B-->D"
     from: "bolt_B"
     to: "bolt_D"
  - name: "C-->D"
     from: "bolt_C"
     to: "bolt_D"

It builds something like below (imagine the arrow from A-> B, A-> C,
B->D, and C->D)
---------------------------------------------------------
                            Bolt_B
Spout -> Bolt_A                  -> Bolt_D
                            Bolt_C
---------------------------------------------------------

I get an error below in FLUX.

Exception in thread "main" java.lang.IllegalArgumentException: Bolt
has already been declared for id bolt_D
    at 
backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
    at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
    at 
org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
    at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
    at org.apache.storm.flux.Flux.runCli(Flux.java:153)
    at org.apache.storm.flux.Flux.main(Flux.java:98)

Looking at the buildStreamDefinitions code in the FluxBuilder it
iterates through each of the defined streams then calls the
appropriate

     builder.setBolt(stream.getTo()...).

Since I have two streams going to Bolt_D it ends up getting the error
above.  Does someone have a patch or fix out there already?

A possible fix is to cache the BoltDeclarer by getTo() id then skip
the builder.setBolt method so the code can continue setting the
different types of groupings on the rest of streams.  Just a thought.

Thanks,
Romeo

Reply via email to