I know that this is very late to jump in but if you are integrating into
a system that is already using postgres as a message broker/queue, why
not poll your postgres queue directly from nexttuple instead of pushing
everything onto another queue? Just use jdbc to connect to postgres
from nexttuple.
Best,
Marc
On Thu, Jul 17, 2014 at 02:18:33PM -0700, Sa Li wrote:
> Hello, Robert
> This is the followup message from last thread, just back this topic from other
> work, as you suggested, I have written the topology to run, basically I will
> get the json object from kafka producer, the object is like this:
>
> "{"messagetype":"PageView","time":1402437708,"totaltime":9,"pagename
> :"user.aspx","profileid":69781139,"userid :76888177}"
>
> And I want to be able to retrieve data fields of time and userid for future
> use, here the code I wrote
>
> public class ingestTopology {
>
>
> protected ingestTopology() {
> throw new UnsupportedOperationException();
> }
>
> public static class JsonObjectParse extends BaseFunction {
> @Override
> public final void execute(
> final TridentTuple tuple,
> final TridentCollector collector
> ) {
> byte[] bytes = tuple.getBinary(0);
> try {
> String decoded = new String(bytes);
> JSONObject json = new JSONObject(decoded);
> collector.emit(new Values(
> json.getString("time")
> , json.getString("userid")
> ));
> } catch (JSONException e) {
> System.err.println("Caught JSONException: " + e.getMessage());
> }
> }
> }
>
>
>
> public static StormTopology buildTopology() {
> try {
> TridentTopology topology = new TridentTopology();
> BrokerHosts zk = new ZkHosts("localhost");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
>
>
>
> topology.newStream("spout1", spout)
> .each(new Fields("str"), new JsonObjectParse(),
> new Fields(
> "time",
> "userid"
> ));
>
> return topology.build();
> } catch (IllegalArgumentException e) {
> System.err.println("Caught IOException: " + e.getMessage());
> }
> return null;
> }
>
> public static void main(String[] args) throws Exception {
>
>
>
> Config conf = new Config();
> conf.setDebug(true);
>
>
>
> if (args != null && args.length > 0) {
> conf.setNumWorkers(3);
>
> StormSubmitter.submitTopology(args[0], conf, buildTopology());
> }
> else {
> conf.setMaxSpoutPending(1);
> conf.setMaxTaskParallelism(3);
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology("kafka", conf, buildTopology());
>
>
>
> Thread.sleep(100);
> cluster.shutdown();
> }
>
>
>
> }
> }
>
> However I got tons of error,
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/
> slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/
> slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/ch/qos/logback/
> logback-classic/1.0.6/logback-classic-1.0.6.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
> [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=
> 192.168.128.10
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=
> 1.6.0_65
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=
> Apple Inc.
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/
> System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
> [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.class.path
> =/workspace/tools/stormprj/kafka-producers/target/test-classes:/workspace/tools
> /stormprj/kafka-producers/target/classes:/Users/sali/.m2/repository/javax/
> servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/sali/.m2/repository/org/
> twitter4j/twitter4j-stream/3.0.5/twitter4j-stream-3.0.5.jar:/Users/sali/.m2/
> repository/org/twitter4j/twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/
> sali/.m2/repository/net/sf/opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/
> repository/org/json/json/20140107/json-20140107.jar:/Users/sali/.m2/repository/
> org/slf4j/slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/
> org/slf4j/slf4j-api/1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/
> log4j/1.2.17/log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/
> scala-library/2.9.2/scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/
> apache/zookeeper/zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository
> /org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/
> repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit
> /junit/4.10/junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/
> 3.2.2.Final/netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/
> kafka_2.9.2/0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/
> jopt-simple/jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/
> scala-lang/scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/
> repository/com/101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/
> org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/
> repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/
> sali/.m2/repository/com/yammer/metrics/metrics-annotation/2.2.0/
> metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/
> storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/
> repository/org/apache/curator/curator-framework/2.4.0/
> curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/
> curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/
> google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/
> testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/
> hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/
> beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/
> jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/
> snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/
> mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/
> easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/
> repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/
> repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org
> /hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/
> repository/storm/storm/0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/
> storm/storm-console-logging/0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/
> sali/.m2/repository/storm/storm-core/0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali
> /.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/
> repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/
> repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali
> /.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2
> /repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/
> repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/
> Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/
> httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/
> 1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/
> commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/
> clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/
> joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/
> curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/
> com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali
> /.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/
> com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/
> repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/
> repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/
> sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/
> sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/
> repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/
> commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users
> /sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/
> repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/
> repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/
> sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/
> ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/
> 0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/
> jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/
> repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali
> /.m2/repository/org/clojure/math.numeric-tower/0.0.1/
> math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/
> carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/
> 2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/
> reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/
> asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/
> minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/
> 1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/
> tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/
> disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/
> 0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/
> logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/
> qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/
> repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/
> sali/.m2/repository/storm/storm-netty/0.9.0.1/storm-netty-0.9.0.1.jar:/Users/
> sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/
> sali/.m2/repository/commons-collections/commons-collections/3.2.1/
> commons-collections-3.2.1.jar
> [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/
> Extensions:/usr/lib/java
> [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.io.tmpdir=
> /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=
> <NA>
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac OS
> X
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=
> 10.9.4
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=sali
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/
> Users/sali
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/
> workspace/tools/stormprj/kafka-producers
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:host.name=192.168.128.10
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.version=1.6.0_65
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.vendor=Apple Inc.
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/
> Contents/Home
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.class.path=/workspace/tools/stormprj/kafka-producers/target/
> test-classes:/workspace/tools/stormprj/kafka-producers/target/classes:/Users/
> sali/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/
> sali/.m2/repository/org/twitter4j/twitter4j-stream/3.0.5/
> twitter4j-stream-3.0.5.jar:/Users/sali/.m2/repository/org/twitter4j/
> twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/sali/.m2/repository/net/sf
> /opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/repository/org/json/json/
> 20140107/json-20140107.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-simple/
> 1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-api/
> 1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/log4j/1.2.17/
> log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/scala-library/2.9.2/
> scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/apache/zookeeper/
> zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository/org/slf4j/
> slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/repository/jline/
> jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit/junit/4.10/
> junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/3.2.2.Final/
> netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/kafka_2.9.2/
> 0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/jopt-simple/
> jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/scala-lang/
> scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/repository/com/
> 101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/org/xerial/
> snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/repository/
> com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/sali/.m2/
> repository/com/yammer/metrics/metrics-annotation/2.2.0/
> metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/
> storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/
> repository/org/apache/curator/curator-framework/2.4.0/
> curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/
> curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/
> google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/
> testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/
> hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/
> beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/
> jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/
> snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/
> mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/
> easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/
> repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/
> repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org
> /hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/
> repository/storm/storm/0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/
> storm/storm-console-logging/0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/
> sali/.m2/repository/storm/storm-core/0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali
> /.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/
> repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/
> repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali
> /.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2
> /repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/
> repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/
> Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/
> httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/
> 1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/
> commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/
> clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/
> joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/
> curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/
> com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali
> /.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/
> com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/
> repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/
> repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/
> sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/
> sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/
> repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/
> commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users
> /sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/
> repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/
> repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/
> sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/
> ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/
> 0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/
> jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/
> repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali
> /.m2/repository/org/clojure/math.numeric-tower/0.0.1/
> math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/
> carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/
> 2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/
> reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/
> asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/
> minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/
> 1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/
> tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/
> disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/
> 0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/
> logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/
> qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/
> repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/
> sali/.m2/repository/storm/storm-netty/0.9.0.1/storm-netty-0.9.0.1.jar:/Users/
> sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/
> sali/.m2/repository/commons-collections/commons-collections/3.2.1/
> commons-collections-3.2.1.jar
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/
> Extensions:/usr/lib/java
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.io.tmpdir=/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.compiler=<NA>
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:os.name=Mac OS X
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:os.arch=x86_64
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:os.version=10.9.4
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:user.name=sali
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:user.home=/Users/sali
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:user.dir=/workspace/tools/stormprj/kafka-producers
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$require.doInvoke(core.clj:5381)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.cluster$loading__4784__auto__.invoke(cluster.clj:1)
> at backtype.storm.cluster__init.load(Unknown Source)
> at backtype.storm.cluster__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$require.doInvoke(core.clj:5381)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.daemon.nimbus__init.load(Unknown Source)
> at backtype.storm.daemon.nimbus__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$require.doInvoke(core.clj:5381)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.testing$loading__4784__auto__.invoke(testing.clj:1)
> at backtype.storm.testing__init.load(Unknown Source)
> at backtype.storm.testing__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:605)
> at clojure.core$use.doInvoke(core.clj:5392)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at
> backtype.storm.LocalCluster$loading__4784__auto__.invoke(LocalCluster.clj:1)
> at backtype.storm.LocalCluster__init.load(Unknown Source)
> at backtype.storm.LocalCluster__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.lang.Var.invoke(Var.java:415)
> at backtype.storm.LocalCluster.<clinit>(Unknown Source)
> at storm.ingestTopology.main(ingestTopology.java:126)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.zookeeper.server.NIOServerCnxn$Factory
> at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:171)
> at backtype.storm.zookeeper$loading__4784__auto__.invoke(zookeeper.clj:1)
> at backtype.storm.zookeeper__init.load(Unknown Source)
> at backtype.storm.zookeeper__init.<clinit>(Unknown Source)
> ... 90 more
>
> I poke around this error java.lang.ClassNotFoundException:
> org.apache.zookeeper.server.NIOServerCnxn$Factory, making changes on pom,
> seems
> there is no solid solution, someone suggested to downgrade the zookeeper, but
> I
> kinda do not want to do that, is there any other solution?
>
>
> thanks
>
>
> Alec
>
>
> On Jul 7, 2014, at 5:04 PM, Sa Li <[email protected]> wrote:
>
>
> great! Robert, will start from there.
>
>
> On Jul 7, 2014, at 11:14 AM, Robert Lee <[email protected]> wrote:
>
>
> Alec,
>
> Check out the very nicely compiled storm-kafka module within storm
> that
> has been developed by wurstmeister (https://github.com/apache/
> incubator-storm/tree/master/external/storm-kafka). For a quick start
> add the following to your pom file:
> <dependency>
> <artifactId>storm-kafka</artifactId>
> <groupId>org.apache.storm</groupId>
> <version>0.9.2-incubating</version>
> </dependency>
>
> And within your main code set up a kafka spout with the following
> code:
>
>
> TridentTopology topology = new TridentTopology();
> BrokerHosts zk = new ZkHosts("localhost");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "ingest_test");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout spout = new
> OpaqueTridentKafkaSpout(spoutConf);
>
>
>
> topology.newStream("kafka", spout).shuffle()
>
> .each(new Fields("str"), new FunctionThatWorksOnKafkaOutputMessage(),
> new Fields("yourField"))
>
> ......
>
>
> Your first function on the topology after creating the spout stream
> will take the message and create whatever data field you want such
> that
> you can operate on the postgresql data.
>
> Hope that gives you a quick start,
> Robert
>
>
> On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <[email protected]> wrote:
>
> Hello, Robert
>
> As you mentioned in last thread, I download your kafka stuff, that
> was very useful, I have already implemented a kafka producer to
> get
> data from postgresql and sending data to brokers. By checking
>
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> ingest_test --from-beginning
>
>
> I know consumer can receive the data. Now I like to integrate the
> kafka producer into storm. I try to understand the logic, so the
> storm spout suppose to be functional as consumer and _collector
> will get the data sent by kafka producer, is that right?
>
> Hope there are some sample codes available to use.
>
> Thanks
>
> Alec
>
>
> On Jun 27, 2014, at 11:58 AM, Robert Lee <[email protected]>
> wrote:,
>
>
> I always like to simplify things. If I were you, I would use
> the well known and used spout of kafka to ingest data into
> your
> storm cluster. Simply write a Kafka Producer that utilizes the
> postgre java driver to pull out your required data and send it
> as a message. You'll find it is pretty easy to write kafka
> producers. Check out my project of creating some simple
> producers and just mirror that to produce your postgre
> producer:
>
> https://github.com/leerobert/kafka-producers
>
>
> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <[email protected]>
> wrote:
>
> Thanks a lot, John. The entire project is getting data
> from
> postgresql and finally emit and update cassandra tables.
> With the help of Robert in this group, think I have some
> resource of storm-cassandra integration. However, really
> not much tutorials regarding postgres with storm, '
> storm-rdbms ‘ is the only examples I can find about db->
> storm. That would be great if someone can contribute more
> example code about posture-storm. Sorry for the shameless
> requirement from a new storm user.
>
>
> thanka
>
> Alec
> On Jun 27, 2014, at 5:53 AM, John Welcher <
> [email protected]> wrote:
>
>
> Hi
>
> We use Postgres notifications. The spout open method
> registers for database notifications (add, update,
> delete). Each time the spout next method is called we
> check for pending notifications and process
> accordingly.
>
> Good Luck
>
> John
>
>
> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <
> [email protected]> wrote:
>
> Dear all
>
> I am doing an implementation of spout, the stream
> of is coming from a postgresql ingress API
> (in-house project). All I know for now is to get
> spout connected to postgresl, and retrieve the
> data
> periodically and store the data to a queue and
> then
> emits to the topology. Anyone has ever done the
> similar job, hope to get some instructions and
> details from you.
>
>
> thanks
>
> Alec
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>