The Drools's version I depend on is 7.2.0.Final.The dependency in pom.xml is
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>7.2.0.Final</version>
</dependency>
And the bolt is:
public class DealLimitBolt extends BaseRichBolt {
private OutputCollector collector;
private KieSession kieSession;
public void execute(Tuple input) {
// 获取数据
String sentence = (String) input.getValue(0);
// 数据转换
PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
// 获取规则
String key = String.valueOf(dataPoint.tags.get("gatewayId")) +
String.valueOf(dataPoint.getTags().get("deviceId"))
+ dataPoint.getMetric() + "0";
RuleLimitParam paramObj = (RuleLimitParam) MapService.getObject(key);
LOGGER.info(Json.toJson(MapService.getObjects(), JsonFormat.tidy()));
if (paramObj != null) {
//I used the kieSession here
LimitFact fact = new LimitFact();
fact.setHigh(paramObj.high);
fact.setLow(paramObj.low);
fact.setOperate(paramObj.operate);
fact.setValue(Float.valueOf(dataPoint.value));
kieSession.insert(fact);
kieSession.fireAllRules();
}
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
SpringTools instance = SpringTools.getInstance();
ApplicationContext applicationContext =
instance.getApplicationContext();
methodService = applicationContext.getBean(MethodService.class);
DroolsService droolsService =
applicationContext.getBean(DroolsService.class);
droolsService.getRules();
//This is the init.
KieServices ks = KieServices.Factory.get();
KieContainer kContainer = ks.getKieClasspathContainer();
kieSession = kContainer.newKieSession("all-rule");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
The drools also uses a xml file that in resources/META-INF/kmodule.xml,
<?xml version="1.0" encoding="UTF-8"?>
<kmodule xmlns="http://www.drools.org/xsd/kmodule">
<kbase name="rules" packages="com.rules">
<ksession name="all-rule"/>
</kbase>
</kmodule>
And the rules I put them in resources/com/rules/DealLimit.drl
2017-09-15 0:03 GMT+08:00 Stig Rohde Døssing <[email protected]>:
> There are a lot of differences between local cluster and production. The
> primary difference is that local clusters run all the code in one JVM,
> whereas a production cluster runs bolts spread across multiple worker JVMs,
> and the topology wiring code in a separate JVM on the Nimbus host.
>
> There is no initialization in this prepare method:
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.collector = collector;
> }
>
> I think it's easier to help if you post the code you're using to
> initialize Drools, as well as which Drools artifact and version you're
> depending on.
>
> 2017-09-14 12:37 GMT+02:00 张博 <[email protected]>:
>
>> I only use drools in bolt.I init it in prepare method.So,I think that it
>> is not the reason.But it runs in the localcluster.Do you know the
>> difference between the localcluster and the production cluster?
>>
>> 2017-09-13 23:27 GMT+08:00 Stig Rohde Døssing <[email protected]>:
>>
>>> I'm not familiar with Drools, so I'm just guessing here, but are you
>>> doing any kind of setup of the KieContainer before submitting your
>>> topology? When you run your topology the bolt doesn't run in the same JVM
>>> as the topology setup code, so any setup done via static variables/methods
>>> won't transfer from the submitter JVM to the bolt JVM.
>>>
>>> If you need to run code before starting a worker, you might want to look
>>> at https://github.com/apache/storm/blob/master/storm-client/src
>>> /jvm/org/apache/storm/hooks/BaseWorkerHook.java and
>>> https://storm.apache.org/releases/1.0.3/javadocs/org/apache/
>>> storm/topology/TopologyBuilder.html#addWorkerHook-org.apache
>>> .storm.hooks.IWorkerHook-.
>>>
>>> 2017-09-13 15:30 GMT+02:00 zhangwenwei <[email protected]>:
>>>
>>>> According to the log info, there have a NPE occur when call method
>>>> kieContainer.newKieSession().
>>>>
>>>> Best Regards,
>>>> Jerry Zhang
>>>>
>>>> > On 13 Sep 2017, at 14:15, 张博 <[email protected]> wrote:
>>>> >
>>>> > Hi!
>>>> > Now I want to use Drools in a blot,it works normal in the
>>>> LocalCluster, but when I put it to the production cluster,it has error.
>>>> > The blot:
>>>> > public class DealLostBolt extends BaseRichBolt {
>>>> >
>>>> > private static final long serialVersionUID = 1L;
>>>> >
>>>> > private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_
>>>> LOST_BOLT");
>>>> >
>>>> > private OutputCollector collector;
>>>> >
>>>> > private KieSession kieSession;
>>>> >
>>>> > private FactHandle factHandle;
>>>> >
>>>> > @Override
>>>> > public void execute(Tuple input) {
>>>> > // 获取数据
>>>> > String sentence = (String) input.getValue(0);
>>>> > LOGGER.info("DealLostBolt获取到的数据:" + sentence);
>>>> >
>>>> > // 数据转换
>>>> > PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class,
>>>> sentence);
>>>> >
>>>> > KieServices ks = KieServices.Factory.get();
>>>> > KieContainer kieContainer = ks.getKieClasspathContainer();
>>>> > kieSession = kieContainer.newKieSession("all-rule");
>>>> > kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
>>>> >
>>>> > factHandle = kieSession.insert(dataPoint);
>>>> > kieSession.fireAllRules();
>>>> > kieSession.delete(factHandle);
>>>> >
>>>> > collector.emit(new Values(sentence));
>>>> > }
>>>> >
>>>> > @Override
>>>> > public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> > declarer.declare(new Fields("value"));
>>>> >
>>>> > }
>>>> >
>>>> > @Override
>>>> > public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>> > this.collector = collector;
>>>> > }
>>>> >
>>>> > }
>>>> > The erros:
>>>> > java.lang.RuntimeException: java.lang.NullPointerException
>>>> > at
>>>> > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at
>>>> > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at
>>>> > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at
>>>> > org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>>> [storm-core-1.1.1.jar:1.1.1]
>>>> > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>> > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
>>>> > Caused by: java.lang.NullPointerException
>>>> > at
>>>> > org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.AbstractKieModule.getRe
>>>> source(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>>> sourceToCompiler(AbstractKieModule.java:264)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.AbstractKieModule.addRe
>>>> sourceToCompiler(AbstractKieModule.java:259)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.AbstractKieProject.buil
>>>> dKnowledgePackages(AbstractKieProject.java:228)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.AbstractKieModule.creat
>>>> eKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.KieContainerImpl.create
>>>> KieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKie
>>>> Base(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>>> Session(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> > at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKie
>>>> Session(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-
>>>> with-dependencies.jar:?]
>>>> > at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52)
>>>> ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
>>>> > at
>>>> > org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at
>>>> > org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at
>>>> > org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > at
>>>> > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482)
>>>> ~[storm-core-1.1.1.jar:1.1.1]
>>>> > ... 6 more
>>>> >
>>>> > Could somebody help me?
>>>> >
>>>> > Thanks!
>>>>
>>>>
>>>
>>
>