It is giving NPE in execute method of WriteDB execute method. You have declared private Connection conn = null ;
and used it in execute ps = conn.prepareStatement(queryBuilder.toString()) ; Where is conn initialized? On 12/6/14, Sa Li <[email protected]> wrote: > Hi, all > > I wrote a writeDB function to write tuples into database, > > public static class WriteDB extends BaseFunction { > private Connection conn = null ; > PreparedStatement ps = null; > > @Override > public final void execute(final TridentTuple tuple, final > TridentCollector collector) { > int user = tuple.getInteger(0); > String value = tuple.getString(1); > final StringBuilder queryBuilder = new > StringBuilder() > .append("INSERT INTO test.state(userid, > event) VALUES(") > .append(user) > .append(", '") > .append(value) > .append("')"); > System.out.println(queryBuilder.toString()); > try { > ps = > conn.prepareStatement(queryBuilder.toString()) ; > ps.execute(); > collector.emit(new > Values(tuple.getStringByField("event"))); > } > catch (SQLException ex) { > System.err.println("Caught IOException: " + > ex.getMessage()); > } finally { > if (ps != null) { > try { > > ps.close(); > } catch > (SQLException ex) { > } > } > } > } > } > > This is the topology: > topology.newStream("topictestspout", kafkaSpout) > .each(new Fields("str"), > new JsonObjectParse(), > new Fields("userid","event")) > .each(new Fields("userid","event"), > new WriteDB(), > new Fields("events")); > > However, I am getting such error: > 7026 [Thread-12-b-0] ERROR backtype.storm.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > Caused by: java.lang.NullPointerException: null > at > storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146) > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] > at > storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122) > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] > at > storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) > ~[storm-core-0.9.3.jar:0.9.3] > ... 6 common frames omitted > 7027 [Thread-12-b-0] ERROR backtype.storm.daemon.executor - > java.lang.RuntimeException: java.lang.NullPointerException > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) > ~[storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) > ~[storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > Caused by: java.lang.NullPointerException: null > at > storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146) > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] > at > storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122) > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] > at > storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146) > ~[storm-core-0.9.3.jar:0.9.3] > at > storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) > ~[storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) > ~[storm-core-0.9.3.jar:0.9.3] > ... 6 common frames omitted > 7231 [Thread-12-b-0] ERROR backtype.storm.util - Halting process: ("Worker > died") > java.lang.RuntimeException: ("Worker died") > at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.RestFn.invoke(RestFn.java:423) > [clojure-1.5.1.jar:na] > at > backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) > [storm-core-0.9.3.jar:0.9.3] > at > backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) > [storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > > Any idea why that happen? > > > thanks > > Alec >
