Maybe I should give an example:

class MyBolt extends BaseRichBolt {

  @transient var injector: XYZModule = null

  override def prepare(stormConf: util.Map[_, _], context: TopologyContext,
collector: OutputCollector): Unit = {

    val theConfig: Config = getConfigFromStormConfig(stormConf)

    injector = new XYZModule {

      val config = theConfig

    }

  }


  override def cleanup(): Unit = {

    injector.lifetime.close(true)

  }


  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {

    // …

  }


  override def execute(input: Tuple): Unit = {

    try {

      // use xyzService from the injector

      injector.xyzService.doXYZ(….)

    } finally {

      collector.ack(input)

    }

  }

}


It is a while since I've touched Guice, so I don't remember the equivalent
method calls, etc but that should illustrate the idea.


Cheers,

John



On Tue, Oct 21, 2014 at 3:48 PM, John Reilly <[email protected]> wrote:

> The injector is created inside the prepare method.
>
> On Tue, Oct 21, 2014 at 1:44 PM, Stephen Armstrong <
> [email protected]> wrote:
>
>> I'm not understanding something here:
>>
>> If the bolt is pulling its dependencies from Guice inside it's prepare()
>> method, where does it get the injector? If it gets it from the constructor,
>> then the serialization issue still happens. If it gets it from a static
>> variable, then in the production environment, how does this static variable
>> get initialized on remote worker machines (since the normal topology
>> initialization code is only run on Nimbus). If the static variable is
>> initialized with static code, then how does the test code change it?
>>
>> On Tue, Oct 21, 2014 at 1:03 PM, John Reilly <[email protected]>
>> wrote:
>>
>>> To avoid problems like this, I use a dependency injection system which
>>> is initialized in the prepare method of the bolts.  In my case, I use
>>> macwire (in scala), but you should be able to use spring, guice or any
>>> other di system to achieve the same.
>>>
>>> Cheers,
>>> John
>>>
>>> On Tue, Oct 21, 2014 at 10:10 AM, Stephen Armstrong <
>>> [email protected]> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I've got a few topologies running, and have unit tests for each
>>>> bolt/spout in isolation that mock out the edges of the tests (Tuples and
>>>> OutputCollectors), but I want to have a full integration test. I setup
>>>> local mode using the following function:
>>>>
>>>>     public void runTopology(StormTopology topology, Config config, int
>>>> seconds) {
>>>>         long end = System.currentTimeMillis() + (seconds * 1000);
>>>>         LocalCluster cluster = new LocalCluster();
>>>>         cluster.submitTopology("Test topology", config, topology);
>>>>         try {
>>>>             while (System.currentTimeMillis() < end) {
>>>>                 Thread.sleep(10);
>>>>             }
>>>>             LOG.info("Finished run, exiting");
>>>>         } catch (InterruptedException e) {
>>>>             fail("Interrupted", e);
>>>>         }
>>>>         cluster.killTopology("Test topology");
>>>>         try {
>>>>             Thread.sleep(1000);
>>>>         } catch (InterruptedException e) {
>>>>             fail("Interrupted", e);
>>>>         }
>>>>         cluster.shutdown();
>>>>     }
>>>>
>>>> The problem arises from using EasyMock for dependencies injected into
>>>> the Bolts/Spouts, since this system serializes them. Is there a way to turn
>>>> off the serialization of the bolts for local mode, or does anyone have any
>>>> other advice?
>>>>
>>>> One obvious but really ugly hack is to store the EasyMock objects in
>>>> static variables, and have serializable proxy objects that simply pull the
>>>> mock from the static variable, but that sounds pretty hideous so I'd like
>>>> to avoid it.
>>>>
>>>> Thanks
>>>> Steve
>>>>
>>>
>>>
>>
>

Reply via email to