Hi Javier ,
So , I am using a Local cluster on my dev machine where I am using Eclipse
. Here , I am passing Springs ApplicationContext as constructor argument to
spouts and bolts .
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), 10);
builder.setBolt("mapBolt", new GroupingBolt(appContext),
10).shuffleGrouping("rabbitMqSpout");
builder.setBolt("reduceBolt", new PublishingBolt(appContext),
10).shuffleGrouping("mapBolt");
Config conf = new Config();
conf.registerSerialization(EventBean.class); /
conf.registerSerialization(InputQueueManagerImpl.class);
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
And in my spouts and Bolts ,
I make my Application Context variable as static . So when it is launched
by c;uster.submitTopology , my context is still avalilable
private static ApplicationContext ctx;
public RabbitListnerSpout(ApplicationContext appContext) {
LOG.info("RabbitListner Constructor called");
ctx = appContext;
}
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context,SpoutOutputCollector
collector) {
LOG.info("Inside the open Method for RabbitListner Spout");
inputManager = (InputQueueManagerImpl) ctx.getBean(InputQueueManagerImpl.
class);
notificationManager = (NotificationQueueManagerImpl) ctx
.getBean(NotificationQueueManagerImpl.class);
eventExchange = ctx.getEnvironment().getProperty(
"input.rabbitmq.events.exchange");
routingKey = ctx.getEnvironment().getProperty(
"input.rabbitmq.events.routingKey");
eventQueue = ctx.getEnvironment().getProperty("input.rabbitmq.events.queue"
);
_collector = collector;
LOG.info("Exiting the open Method for RabbitListner Spout");
}
This is working like a charm (my ApplicationContext is initialized
seperately ) . As we all know , ApplicationContext is not serializable .
But this works well in LocalCluster.
My assumption is that it will work in a seperate Cluster too . Is my
assumption correct ??
On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <[email protected]> wrote:
> IIRC, only if everything you use in your spouts and bolts is serializable.
> On Oct 6, 2015 11:29 PM, "Ankur Garg" <[email protected]> wrote:
>
>> Hi Ravi ,
>>
>> I was able to make an Integration with Spring but the problem is that I
>> have to autowire for every bolt and spout . That means that even if i
>> parallelize spout and bolt it will get started to each instance . Is there
>> some way that I only have to do for bolts and spouts once (I mean if I
>> parallelize bolts or spouts individually it can share the conf from
>> somewhere) . IS this possible??
>>
>> Thanks
>> Ankur
>>
>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <[email protected]> wrote:
>>
>>> Yes this is for annotation also...
>>>
>>> you can call this method in prepare() method of bolt and onOpen() method
>>> in every Spout and make sure you don't use any autowire bean before this
>>> call.
>>>
>>>
>>>
>>>
>>> Ravi.
>>>
>>>
>>>
>>>
>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <[email protected]>
>>> wrote:
>>>
>>> > Hi Ravi ,
>>> >
>>> > Thanks for your reply . I am using annotation based configuration and
>>> using
>>> > Spring Boot.
>>> >
>>> > Any idea how to do it using annotations ?
>>> >
>>> >
>>> >
>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma <[email protected]>
>>> wrote:
>>> >
>>> > > Bolts and Spouts are created by Storm and not known to Spring
>>> Context.
>>> > You
>>> > > need to manually add them to SpringContext, there are few methods
>>> > available
>>> > > i.e.
>>> > >
>>> > >
>>> > >
>>> >
>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false);
>>> > >
>>> > > SpringContext is my own class where i have injected SpringContext so
>>> > > SpringContext.getContext() returns the actuall Spring Context
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > Ravi.
>>> > >
>>> > >
>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg <[email protected]>
>>> > wrote:
>>> > >
>>> > > > Hi ,
>>> > > >
>>> > > > I am building a Storm topology with set of Spouts and Bolts and
>>> also
>>> > > using
>>> > > > Spring for Dependency Injection .
>>> > > >
>>> > > > Unfortunately , none of my fields are getting autowired even
>>> though I
>>> > > have
>>> > > > declared all my spouts and Bolts as @Components .
>>> > > >
>>> > > > However the place where I am declaring my topology , Spring is
>>> working
>>> > > fine
>>> > > > .
>>> > > >
>>> > > > Is it because cluster.submitTopology("test", conf,
>>> > > > builder.createTopology())
>>> > > > submits the topology to a cluster (locally it spawns different
>>> thread
>>> > > for
>>> > > > Spouts and Bolts) that Autowiring is not working?
>>> > > >
>>> > > > Please suggest .
>>> > > >
>>> > >
>>> >
>>>
>>
>>