Hi,
I'm testing migrating over a topology I have to flux. The
spout:
- id: "spout"
bolts:
- id: "bolt_A"
className: "com.blah.boltA"
parallelism: 1
- id: "bolt_B"
className: "com.blah.boltB"
parallelism: 1
- id: "bolt_C"
className: "com.blah.boltC"
parallelism: 1
- id: "bolt_D"
className: "com.blah.boltD"
parallelism: 1
streams:
- name: ""
from: "spout"
to: "bolt_A"
grouping:
type: SHUFFLE
- name: "A-->B"
from: "bolt_A"
to: "bolt_B"
grouping:
streamId: "forB"
- name: "A-->C"
from: "bolt_A"
to: "bolt_C"
grouping:
streamId: "forC"
- name: "B-->D"
from: "bolt_B"
to: "bolt_D"
- name: "C-->D"
from: "bolt_C"
to: "bolt_D"
It builds something like below (imagine the arrow from A-> B, A-> C,
B->D, and C->D)
---------------------------------------------------------
Bolt_B
Spout -> Bolt_A -> Bolt_D
Bolt_C
---------------------------------------------------------
I get an error below in FLUX.
Exception in thread "main" java.lang.IllegalArgumentException: Bolt
has already been declared for id bolt_D
at
backtype.storm.topology.TopologyBuilder.validateUnusedId(TopologyBuilder.java:212)
at backtype.storm.topology.TopologyBuilder.setBolt(TopologyBuilder.java:139)
at
org.apache.storm.flux.FluxBuilder.buildStreamDefinitions(FluxBuilder.java:158)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:94)
at org.apache.storm.flux.Flux.runCli(Flux.java:153)
at org.apache.storm.flux.Flux.main(Flux.java:98)
Looking at the buildStreamDefinitions code in the FluxBuilder it
iterates through each of the defined streams then calls the
appropriate
builder.setBolt(stream.getTo()...).
Since I have two streams going to Bolt_D it ends up getting the error
above. Does someone have a patch or fix out there already?
A possible fix is to cache the BoltDeclarer by getTo() id then skip
the builder.setBolt method so the code can continue setting the
different types of groupings on the rest of streams. Just a thought.
Thanks,
Romeo