Hi team,
I have integrated SparkSteam with Flume and my flume as well spark job gets
failed and getting following error. Your help will be highly appreciative.
Many Thanks

my flume configuration is as follows
flume.conf
***********
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel2

# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /mapr/rawdata/input/syslog


agent1.sinks.sink1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.sink1.hostname = <hostname>.com
agent1.sinks.sink1.port = 43333
agent1.sinks.sink1.batchSize=200

agent1.channels.channel2.type = FILE
agent1.channels.channel2.checkpointDir=/home/mapr/flume/java/checkpoint
agent1.channels.channel2.dataDirs=/home/mapr/flume/java/data
agent1.channels.channel2.capacity = 1000000
agent1.channels.channel2.transactionCapacity = 1000

agent1.sources.source1.channels = channel2
agent1.sinks.sink1.channel = channel2



Error at flume side
***********************
16/08/03 02:15:30 ERROR source.ExecSource: Failed while running command:
tail -f /mapr/rawdata/input/syslog
org.apache.flume.ChannelException: Unable to put batch on required channel:
FileChannel channel2 { dataDirs: [/home/mapr/flume/java/data] }
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at
org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:382)
        at
org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:342)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: The channel has reached
it's capacity. This might be the result of a sink on the channel having too
low of batch size, a downstream system running slower than normal, or that
the channel capacity is just too low. [channel=channel2]
        at
org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:465)
        at
org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
        at
org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
        ... 7 more
*************************
Error at spark side
****************
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
Exception in thread "qtp422134367-185" Exception in thread
"qtp422134367-101" Exception in thread "qtp422134367-97"
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter(AbstractQueuedSynchronizer.java:606)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at
java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at
org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:247)
        at
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:537)
        at java.lang.Thread.run(Thread.java:745)
java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "dispatcher-event-loop-20" java.lang.OutOfMemoryError:
GC overhead limit exceeded
16/08/02 11:49:06 WARN DefaultChannelPipeline: An exception was thrown by a
user handler's exceptionCaught() method while handling the following
exception:
java.lang.OutOfMemoryError: GC overhead limit exceeded
16/08/02 11:49:06 ERROR InsertIntoHadoopFsRelation: Aborting job.
java.lang.OutOfMemoryError: GC overhead limit exceeded
16/08/02 11:49:06 WARN NioEventLoop: Unexpected exception in the selector
loop.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at
io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:126)
        at
io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:221)
        at
io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:259)
        at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:346)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
        at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

Reply via email to