Hi, Susheel
Here is the conn,
public class postgresConnector {
private class Events {
public int id;
public String event_object;
}
public void connectIngress() {
Connection conn = null;
conn = connectToDatabaseOrDie();
}
public Connection connectToDatabaseOrDie() {
Connection conn = null;
try {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://10.100.70.84:5432/ingest";
conn = DriverManager.getConnection(url, "nn", "nn");
System.out.println("DB connected .....");
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
} catch (SQLException e)
{
e.printStackTrace();
System.exit(2);
}
return conn;
}
}
I change the writeDB as
public static class WriteDB extends BaseFunction {
PreparedStatement ps = null ;
postgresConnector connector = new postgresConnector() ;
Connection conn = connector.connectToDatabaseOrDie();
@Override
public final void execute(final TridentTuple tuple, final
TridentCollector collector) {
int user = tuple.getInteger(0);
String value = tuple.getString(1);
final StringBuilder queryBuilder = new StringBuilder()
.append("INSERT INTO test.state(userid, event) VALUES(")
.append(user)
.append(", '")
.append(value)
.append("')");
try {
ps = conn.prepareStatement(queryBuilder.toString()) ;
ps.execute();
collector.emit(new Values(tuple.getStringByField("event")));
} catch (SQLException ex) {
System.err.println("Caught IOException: " +
ex.getMessage());
}finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException ex) {
}
}
}
}
}
DB connected, but I got such error, I am sure what is wrong with the
connection.
DB connected .....
3985 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory -
Thread Thread[main,5,main] died
java.lang.RuntimeException: java.io.NotSerializableException:
org.postgresql.jdbc4.Jdbc4Connection
at
backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:43)
~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.utils.Utils.serialize(Utils.java:85)
~[storm-core-0.9.3.jar:0.9.3]
at
backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106)
~[storm-core-0.9.3.jar:0.9.3]
at
storm.trident.topology.TridentTopologyBuilder.buildTopology(TridentTopologyBuilder.java:246)
~[storm-core-0.9.3.jar:0.9.3]
at storm.trident.TridentTopology.build(TridentTopology.java:425)
~[storm-core-0.9.3.jar:0.9.3]
at
storm.ingress.KafkaIngressTopology.buildTridentKafkaTopology(KafkaIngressTopology.java:292)
~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at
storm.ingress.KafkaIngressTopology.main(KafkaIngressTopology.java:318)
~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
Caused by: java.io.NotSerializableException:
org.postgresql.jdbc4.Jdbc4Connection
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
~[na:1.7.0_65]
at java.util.HashMap.writeObject(HashMap.java:1128) ~[na:1.7.0_65]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.7.0_65]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
~[na:1.7.0_65]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.7.0_65]
at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_65]
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
~[na:1.7.0_65]
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
~[na:1.7.0_65]
at
backtype.storm.serialization.DefaultSerializationDelegate.serialize(DefaultSerializationDelegate.java:39)
~[storm-core-0.9.3.jar:0.9.3]
... 6 common frames omitted
Thanks
Alec
On Fri, Dec 5, 2014 at 9:50 PM, Susheel Kumar Gadalay <[email protected]>
wrote:
> It is giving NPE in execute method of WriteDB execute method.
>
> You have declared
> private Connection conn = null ;
>
> and used it in execute
> ps = conn.prepareStatement(queryBuilder.toString()) ;
>
> Where is conn initialized?
>
> On 12/6/14, Sa Li <[email protected]> wrote:
> > Hi, all
> >
> > I wrote a writeDB function to write tuples into database,
> >
> > public static class WriteDB extends BaseFunction {
> > private Connection conn = null ;
> > PreparedStatement ps = null;
> >
> > @Override
> > public final void execute(final TridentTuple tuple, final
> > TridentCollector collector) {
> > int user = tuple.getInteger(0);
> > String value = tuple.getString(1);
> > final StringBuilder queryBuilder = new
> > StringBuilder()
> > .append("INSERT INTO test.state(userid,
> > event) VALUES(")
> > .append(user)
> > .append(", '")
> > .append(value)
> > .append("')");
> > System.out.println(queryBuilder.toString());
> > try {
> > ps =
> > conn.prepareStatement(queryBuilder.toString()) ;
> > ps.execute();
> > collector.emit(new
> > Values(tuple.getStringByField("event")));
> > }
> > catch (SQLException ex) {
> > System.err.println("Caught IOException: " +
> > ex.getMessage());
> > } finally {
> > if (ps != null) {
> > try {
> >
> > ps.close();
> > } catch
> > (SQLException ex) {
> > }
> > }
> > }
> > }
> > }
> >
> > This is the topology:
> > topology.newStream("topictestspout", kafkaSpout)
> > .each(new Fields("str"),
> > new JsonObjectParse(),
> > new Fields("userid","event"))
> > .each(new Fields("userid","event"),
> > new WriteDB(),
> > new Fields("events"));
> >
> > However, I am getting such error:
> > 7026 [Thread-12-b-0] ERROR backtype.storm.util - Async loop died!
> > java.lang.RuntimeException: java.lang.NullPointerException
> > at
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> > Caused by: java.lang.NullPointerException: null
> > at
> >
> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> > at
> >
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> > at
> >
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > ... 6 common frames omitted
> > 7027 [Thread-12-b-0] ERROR backtype.storm.daemon.executor -
> > java.lang.RuntimeException: java.lang.NullPointerException
> > at
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> > Caused by: java.lang.NullPointerException: null
> > at
> >
> storm.ingress.KafkaIngressTopology$WriteDB.execute(KafkaIngressTopology.java:146)
> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> > at
> >
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.trident.planner.processor.AppendCollector.emit(AppendCollector.java:50)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.ingress.KafkaIngressTopology$JsonObjectParse.execute(KafkaIngressTopology.java:122)
> > ~[kafka-storm-ingress-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> > at
> >
> storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> > storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> > ~[storm-core-0.9.3.jar:0.9.3]
> > ... 6 common frames omitted
> > 7231 [Thread-12-b-0] ERROR backtype.storm.util - Halting process:
> ("Worker
> > died")
> > java.lang.RuntimeException: ("Worker died")
> > at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
> > [storm-core-0.9.3.jar:0.9.3]
> > at clojure.lang.RestFn.invoke(RestFn.java:423)
> > [clojure-1.5.1.jar:na]
> > at
> > backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452)
> > [storm-core-0.9.3.jar:0.9.3]
> > at
> >
> backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240)
> > [storm-core-0.9.3.jar:0.9.3]
> > at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
> > [storm-core-0.9.3.jar:0.9.3]
> > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> >
> > Any idea why that happen?
> >
> >
> > thanks
> >
> > Alec
> >
>