Alec,
Check out the very nicely compiled storm-kafka module within storm that has
been developed by wurstmeister (
https://github.com/apache/incubator-storm/tree/master/external/storm-kafka).
For a quick start add the following to your pom file:
<dependency>
<artifactId>storm-kafka</artifactId>
<groupId>org.apache.storm</groupId>
<version>0.9.2-incubating</version>
</dependency>
And within your main code set up a kafka spout with the following code:
TridentTopology topology = new TridentTopology();BrokerHosts zk = new
ZkHosts("localhost");TridentKafkaConfig spoutConf = new
TridentKafkaConfig(zk, "ingest_test");spoutConf.scheme = new
SchemeAsMultiScheme(new StringScheme());OpaqueTridentKafkaSpout spout
= new OpaqueTridentKafkaSpout(spoutConf);
topology.newStream("kafka", spout).shuffle()
.each(new Fields("str"), new FunctionThatWorksOnKafkaOutputMessage(),
new Fields("yourField"))
......
Your first function on the topology after creating the spout stream will
take the message and create whatever data field you want such that you can
operate on the postgresql data.
Hope that gives you a quick start,
Robert
On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <[email protected]> wrote:
> Hello, Robert
>
> As you mentioned in last thread, I download your kafka stuff, that was
> very useful, I have already implemented a kafka producer to get data from
> postgresql and sending data to brokers. By checking
>
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> ingest_test --from-beginning
>
>
> I know consumer can receive the data. Now I like to integrate the kafka
> producer into storm. I try to understand the logic, so the storm spout
> suppose to be functional as consumer and _collector will get the data sent
> by kafka producer, is that right?
>
> Hope there are some sample codes available to use.
>
> Thanks
>
> Alec
>
>
> On Jun 27, 2014, at 11:58 AM, Robert Lee <[email protected]>
> wrote:,
>
> I always like to simplify things. If I were you, I would use the well
> known and used spout of kafka to ingest data into your storm cluster.
> Simply write a Kafka Producer that utilizes the postgre java driver to pull
> out your required data and send it as a message. You'll find it is pretty
> easy to write kafka producers. Check out my project of creating some simple
> producers and just mirror that to produce your postgre producer:
>
> https://github.com/leerobert/kafka-producers
>
>
> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <[email protected]> wrote:
>
>> Thanks a lot, John. The entire project is getting data from postgresql
>> and finally emit and update cassandra tables. With the help of Robert in
>> this group, think I have some resource of storm-cassandra integration.
>> However, really not much tutorials regarding postgres with storm, '
>> *storm-rdbms* ‘ is the only examples I can find about db->storm. That
>> would be great if someone can contribute more example code about
>> posture-storm. Sorry for the shameless requirement from a new storm user.
>>
>>
>> thanka
>>
>> Alec
>> On Jun 27, 2014, at 5:53 AM, John Welcher <[email protected]> wrote:
>>
>> Hi
>>
>> We use Postgres notifications. The spout open method registers for
>> database notifications (add, update, delete). Each time the spout next
>> method is called we check for pending notifications and process accordingly.
>>
>> Good Luck
>>
>> John
>>
>>
>> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <[email protected]> wrote:
>>
>>> Dear all
>>>
>>> I am doing an implementation of spout, the stream of is coming from a
>>> postgresql ingress API (in-house project). All I know for now is to get
>>> spout connected to postgresl, and retrieve the data periodically and store
>>> the data to a queue and then emits to the topology. Anyone has ever done
>>> the similar job, hope to get some instructions and details from you.
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>>
>>
>>
>
>