Hi,
                                I have installed 
spark-0.9.0-incubating-bin-cdh4 and I am using apache flume for streaming. I 
have used the streaming.examples.FlumeEventCount. Also I have written Avro conf 
file for flume.When I try to do streamin ing spark and I run the following 
command it throws error

./bin/run-example org.apache.spark.streaming.examples.FlumeEventCount local[2] 
ip 10001

org.jboss.netty.channel.ChannelException: Failed to bind to: /ip:9988
        at 
org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
        at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
        at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
        at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
        at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
        at 
org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:143)
        at 
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126)
        at 
org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173)
        at 
org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at 
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.net.BindException: Cannot assign requested address
        at sun.nio.ch.Net.bind(Native Method)
        at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
        at 
org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
        at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
        at 
org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
        ... 3 more

Can you please help

This is my flumetest.conf
[cid:image001.png@01CF5E3E.0CE2FEE0]

a1.sources = tail-file
a1.channels = c1
a1.sinks=avro-sink

# define the flow
a1.sources.tail-file.channels = c1
a1.sinks.avro-sink.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

# define source and sink
a1.sources.tail-file.type = exec
a1.sources.tail-file.command = tail -F /home/hduser/Flume/test.log
#a1.sources.tail-file.channels = c1
a1.sinks.avro-sink.type = avro
a1.sinks.avro-sink.hostname = ip   // agent a2.s ip address or host name
a1.sinks.avro-sink.port = 10001
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.tail-file.channels = c1
a1.sinks.avro-sink.channel = c1

Flumeslavetest.conf:-
[cid:image002.png@01CF5E3E.0CE2FEE0]

a2.sources = avro-collection-source
a2.sinks = hdfs-sink
a2.channels = mem-channel

# define the flow
a2.sources.avro-collection-source.channels = mem-channel
a2.sinks.hdfs-sink.channel = mem-channel
a2.channels.mem-channel.type = memory
a2.channels.mem-channel.capacity = 1000

# avro source properties
a2.sources.avro-collection-source.type = avro
a2.sources.avro-collection-source.bind = ip   // agent a2.s ip address or host 
name
a2.sources.avro-collection-source.port = 10001

# hdfs sink properties
a2.sinks.hdfs-sink.type = hdfs
a2.sinks.hdfs-sink.hdfs.writeFormat = Text
a2.sinks.hdfs-sink.hdfs.filePrefix =  testing
a2.sinks.hdfs-sink.hdfs.path = hdfs://ip:8020/testingData


My flume is running properly its is able to write file on hdfs.

Please help as to how to resolve the error

Regards,
Neha Singh


________________________________
The contents of this e-mail and any attachment(s) may contain confidential or 
privileged information for the intended recipient(s). Unintended recipients are 
prohibited from taking action on the basis of information in this e-mail and 
using or disseminating the information, and must notify the sender and delete 
it from their system. L&T Infotech will not accept responsibility or liability 
for the accuracy or completeness of, or the presence of any virus or disabling 
code in this e-mail"

Reply via email to