Hi team, I have integrated SparkSteam with Flume and my flume as well spark job gets failed and getting following error. Your help will be highly appreciative. Many Thanks
my flume configuration is as follows flume.conf *********** agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel2 # Describe/configure source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -f /mapr/rawdata/input/syslog agent1.sinks.sink1.type = org.apache.spark.streaming.flume.sink.SparkSink agent1.sinks.sink1.hostname = <hostname>.com agent1.sinks.sink1.port = 43333 agent1.sinks.sink1.batchSize=200 agent1.channels.channel2.type = FILE agent1.channels.channel2.checkpointDir=/home/mapr/flume/java/checkpoint agent1.channels.channel2.dataDirs=/home/mapr/flume/java/data agent1.channels.channel2.capacity = 1000000 agent1.channels.channel2.transactionCapacity = 1000 agent1.sources.source1.channels = channel2 agent1.sinks.sink1.channel = channel2 Error at flume side *********************** 16/08/03 02:15:30 ERROR source.ExecSource: Failed while running command: tail -f /mapr/rawdata/input/syslog org.apache.flume.ChannelException: Unable to put batch on required channel: FileChannel channel2 { dataDirs: [/home/mapr/flume/java/data] } at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200) at org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:382) at org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:342) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or that the channel capacity is just too low. [channel=channel2] at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:465) at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93) at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189) ... 7 more ************************* Error at spark side **************** SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "qtp422134367-185" Exception in thread "qtp422134367-101" Exception in thread "qtp422134367-97" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter(AbstractQueuedSynchronizer.java:606) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:247) at org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:537) at java.lang.Thread.run(Thread.java:745) java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded Exception in thread "dispatcher-event-loop-20" java.lang.OutOfMemoryError: GC overhead limit exceeded 16/08/02 11:49:06 WARN DefaultChannelPipeline: An exception was thrown by a user handler's exceptionCaught() method while handling the following exception: java.lang.OutOfMemoryError: GC overhead limit exceeded 16/08/02 11:49:06 ERROR InsertIntoHadoopFsRelation: Aborting job. java.lang.OutOfMemoryError: GC overhead limit exceeded 16/08/02 11:49:06 WARN NioEventLoop: Unexpected exception in the selector loop. java.lang.OutOfMemoryError: GC overhead limit exceeded at io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:126) at io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:221) at io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:259) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:346) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)