Hey guys,
I think i still need some help on the custom flume client. Right now I have
finished the Avro sink client in my storm bolt. On a test machine, i
started a flume agent:
StormAgent.sources = avro
StormAgent.channels = MemChannel
StormAgent.sinks = HDFS

StormAgent.sources.avro.type = avro
StormAgent.sources.avro.channels = MemChannel
StormAgent.sources.avro.bind = localhost
StormAgent.sources.avro.port = 10001

I assume this will automatically wait on the localhost:10001?

When I run my LoadBalancingRpcClient. on the same machine, I receive
connection refused exception:
org.apache.flume.FlumeException: NettyAvroRpcClient { host: localhost,
port: 10001 }: RPC connection error
at
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at
org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at
org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at
org.apache.flume.api.LoadBalancingRpcClient.createClient(LoadBalancingRpcClient.java:214)
at
org.apache.flume.api.LoadBalancingRpcClient.getClient(LoadBalancingRpcClient.java:197)
at
org.apache.flume.api.LoadBalancingRpcClient.append(LoadBalancingRpcClient.java:71)
at
com.walmartlabs.targeting.storm.bolt.HubbleStreamAvroSinkBolt.execute(HubbleStreamAvroSinkBolt.java:89)
at
backtype.storm.daemon.executor$fn__4050$tuple_action_fn__4052.invoke(executor.clj:566)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__3976.invoke(executor.clj:345)
at
backtype.storm.disruptor$clojure_handler$reify__1606.onEvent(disruptor.clj:43)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:84)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
at
backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Error connecting to localhost/
127.0.0.1:10001

Is this still some config issue? I tried ip address as well, but with the
same error. I am this close now...
Thank you for any help!
Chen



On Thu, Jan 9, 2014 at 10:09 PM, Chen Wang <[email protected]>wrote:

> Ashish,
> Interesting enough, i was initially doing 1 too, and had a working
> version. But finally I give it up because in my bolt i have to flush to
> hdfs either when data reaching certain size or a timer times out, which is
> exactly what flume can offer. Also it has some complexity of grouping
> entries within the same partition while with flume it is a piece of cake.
>
> Thank you so much for all you guys's input. It helped me a lot !
> Chen
>
>
>
> On Thu, Jan 9, 2014 at 10:00 PM, Ashish <[email protected]> wrote:
>
>> Got it!
>>
>> My first reaction was to use HDFS bolt to write data directly to HDFS,
>> but couldn't find an implementation for the same. My knowledge is limited
>> for Storm.
>> If the data is already flowing through Storm, you got two options
>> 1. Write a bolt to dump data to HDFS
>> 2. Write a Flume bolt using RPC client as recommended in thread, and
>> reuse Flume's capabilities.
>>
>> If you already have Flume installation running, #2 is quickest way of
>> running. Otherwise also, installing and running Flume is like a walk in the
>> park :)
>>
>> You can also watch related discussion on
>> https://issues.apache.org/jira/browse/FLUME-1286. There is some good
>> info in the JIRA.
>>
>> thanks
>> ashish
>>
>>
>>
>>
>> On Fri, Jan 10, 2014 at 11:08 AM, Chen Wang 
>> <[email protected]>wrote:
>>
>>> Ashish,
>>> Since we already use storm for other real time processing, i thus want
>>> to re utilize it. The biggest advantage for me of using storm in this case
>>> is that i could use storm's spout to read from our socket server
>>> continuously, and the storm framework can ensure it never stops. Meantime,
>>> i can also easily filter out /translate the data in bolt before sending to
>>> flume. For this piece of data stream, right now my first step is to get it
>>> into hdfs, but will add real time processing soon.
>>> Does that make sense to you?
>>> Thanks,
>>> Chen
>>>
>>>
>>> On Thu, Jan 9, 2014 at 9:29 PM, Ashish <[email protected]> wrote:
>>>
>>>> Why do you need Storm? Are you doing any real time processing? If not,
>>>> IMHO, avoid Storm.
>>>>
>>>> Can use something like this
>>>>
>>>> Socket -> Load Balanced RPC Client -> Flume Topology with HA
>>>>
>>>> What Application level protocol are you using at Socket level?
>>>>
>>>>
>>>> On Fri, Jan 10, 2014 at 10:50 AM, Chen Wang <[email protected]
>>>> > wrote:
>>>>
>>>>> Jeff, Joao,
>>>>> Thanks for the pointer!
>>>>> I think i am getting close here:
>>>>> 1. set up a cluster of flume agent with redundancies, source as avro,
>>>>> sink as HDFS.
>>>>> 2 use storm(not quite necessary) to read from our socket server, then
>>>>> in the bolt, using flume client (load balancing rpc client) to send the
>>>>> event to the agent set up in step 1.
>>>>>
>>>>> Then I thus get all the benefit of storm and flume. Does this set up
>>>>> look right to you?
>>>>> thank you very much,
>>>>> Chen
>>>>>
>>>>>
>>>>> On Thu, Jan 9, 2014 at 8:58 PM, Joao Salcedo 
>>>>> <[email protected]>wrote:
>>>>>
>>>>>> Hi Chen,
>>>>>>
>>>>>> Maybe it would be worth checking this
>>>>>>
>>>>>> http://flume.apache.org/FlumeDeveloperGuide.html#loadbalancing-rpc-client
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Joao
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 10, 2014 at 3:50 PM, Jeff Lord <[email protected]>wrote:
>>>>>>
>>>>>>> Have you taken a look at the load balancing rpc client?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 9, 2014 at 8:43 PM, Chen Wang <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Jeff,
>>>>>>>> I have read this ppt at the beginning, but didn't find solution to
>>>>>>>> my user case. To simplify my case, I only have 1 data source(composed 
>>>>>>>> of 5
>>>>>>>> socket server)  and i am looking for a fault tolerant deployment of 
>>>>>>>> flume,
>>>>>>>> that can read from this single data source and sink to hdfs in fault
>>>>>>>> tolerant mode: when one node dies, another flume node can pick up and
>>>>>>>> continue;
>>>>>>>> Thanks,
>>>>>>>> Chen
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 9, 2014 at 7:49 PM, Jeff Lord <[email protected]>wrote:
>>>>>>>>
>>>>>>>>> Chen,
>>>>>>>>>
>>>>>>>>> Have you taken a look at this presentation on Planning and
>>>>>>>>> Deploying Flume from ApacheCon?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://archive.apachecon.com/na2013/presentations/27-Wednesday/Big_Data/11:45-Mastering_Sqoop_for_Data_Transfer_for_Big_Data-Arvind_Prabhakar/Arvind%20Prabhakar%20-%20Planning%20and%20Deploying%20Apache%20Flume.pdf
>>>>>>>>>
>>>>>>>>> It may have the answers you need.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Jeff
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jan 9, 2014 at 7:24 PM, Chen Wang <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Saurabh.
>>>>>>>>>> If that is the case, I am actually thinking about using storm
>>>>>>>>>> spout to talk to our socket server so that the storm cluster can 
>>>>>>>>>> take care
>>>>>>>>>> of the reading socket server part. Then in each storm node, start a 
>>>>>>>>>> flume
>>>>>>>>>> agent, listening on a RPC port and write to HDFS(with fail over) 
>>>>>>>>>> .Then in
>>>>>>>>>> the storm bolt, simply send the data to RPC so that flume can get it.
>>>>>>>>>> How do you think of this setup? It takes care of both failover on
>>>>>>>>>> the source(by storm) and on the sink(by flume) But It looks a little
>>>>>>>>>> complicated for me.
>>>>>>>>>> Chen
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 9, 2014 at 7:18 PM, Saurabh B <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Chen,
>>>>>>>>>>>
>>>>>>>>>>> I think Flume doesn't have a way to configure multiple sources
>>>>>>>>>>> pointing to same data source. Of course you can do that, but you 
>>>>>>>>>>> will end
>>>>>>>>>>> up with duplicate data. Flume offers fail over at the sink level.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jan 9, 2014 at 6:56 PM, Chen Wang <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ok. so after more researching:) It seems that what i need is
>>>>>>>>>>>> the failover for agent source, (not fail over for sink):
>>>>>>>>>>>> If one agent dies, another same kind of agent will start
>>>>>>>>>>>> running.
>>>>>>>>>>>> Does flume support this scenario?
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Chen
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jan 9, 2014 at 3:12 PM, Chen Wang <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> After reading more docs, it seems that if I want to achieve my
>>>>>>>>>>>>> goal, i have to do the following:
>>>>>>>>>>>>> 1. Having one agent with the custom source running on one
>>>>>>>>>>>>> node. This agent reads from those 5 socket server, and sink to 
>>>>>>>>>>>>> some kind of
>>>>>>>>>>>>> sink(maybe another socket?)
>>>>>>>>>>>>> 2. On another(or more) machines, setting up collectors that
>>>>>>>>>>>>> read from the agent sink in 1, and sink to hdfs.
>>>>>>>>>>>>> 3. Having a master node managing nodes in 1,2.
>>>>>>>>>>>>>
>>>>>>>>>>>>> But it seems to be overskilled in my case: in 1, i can already
>>>>>>>>>>>>> sink to hdfs. Since the data available at socket server are much 
>>>>>>>>>>>>> faster
>>>>>>>>>>>>> than the data translation part.  I want to be able to later add 
>>>>>>>>>>>>> more nodes
>>>>>>>>>>>>> to do the translation job. so what is the correct setup?
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Chen
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jan 9, 2014 at 2:38 PM, Chen Wang <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guys,
>>>>>>>>>>>>>> In my environment, the client is 5 socket servers. Thus i
>>>>>>>>>>>>>> wrote a custom source spawning 5 threads reading from each of 
>>>>>>>>>>>>>> them
>>>>>>>>>>>>>> infinitely,and the sink is hdfs(hive table). The work fine by 
>>>>>>>>>>>>>> running flume-ng
>>>>>>>>>>>>>> agent.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But how can i deploy this in distributed mode(cluster)? I am
>>>>>>>>>>>>>> confused about the 3 ties(agent,collector,storage) mentioned in 
>>>>>>>>>>>>>> the doc.
>>>>>>>>>>>>>> Does it apply to my case? How can I separate my 
>>>>>>>>>>>>>> agent/collect/storage?
>>>>>>>>>>>>>> Apparently i can only have one agent running: multiple agent 
>>>>>>>>>>>>>> will result in
>>>>>>>>>>>>>> getting duplicates from the socket server. But I want that if 
>>>>>>>>>>>>>> one agent
>>>>>>>>>>>>>> dies, other agent can take it up. I would also like to be able 
>>>>>>>>>>>>>> to add
>>>>>>>>>>>>>> horizontal scalability for writing to hdfs. How can I achieve 
>>>>>>>>>>>>>> all this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thank you very much for your advice.
>>>>>>>>>>>>>> Chen
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Mailing List Archives,
>>>>>>>>>>> QnaList.com
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> thanks
>>>> ashish
>>>>
>>>> Blog: http://www.ashishpaliwal.com/blog
>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>
>>>
>>>
>>
>>
>> --
>> thanks
>> ashish
>>
>> Blog: http://www.ashishpaliwal.com/blog
>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>
>

Reply via email to