I created a custom flume sink.
Added appropriate jars into lib director of flume-ng.
Set classpath in conf/flume-env.sh
Configured conf/flume.conf as follows:
# Sources, channels and sinks are defined per agent
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.accumulo-sink1.channel = ch1
agent1.sinks.accumulo-sink1.type = flumeSink.AccumuloSink
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = accumulo-sink1
ran flume and get nullpointer:
./bin/flume-ng node --conf conf/ -f conf/flume.conf -n agent1
2012-07-10 15:55:38,990 (main) [INFO -
org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor
.java:58)] Starting lifecycle supervisor 1
2012-07-10 15:55:38,994 (main) [INFO -
org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)] Flume node
starting - agent1
2012-07-10 15:55:39,001 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(Defaul
tLogicalNodeManager.java:110)] Node manager starting
2012-07-10 15:55:39,001 (lifecycleSupervisor-1-1) [INFO -
org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(Abstr
actFileConfigurationProvider.java:67)] Configuration provider starting
2012-07-10 15:55:39,004 (lifecycleSupervisor-1-0) [INFO -
org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor
.java:58)] Starting lifecycle supervisor 9
2012-07-10 15:55:39,010 (lifecycleSupervisor-1-0) [DEBUG -
org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(Defaul
tLogicalNodeManager.java:114)] Node manager started
2012-07-10 15:55:39,010 (lifecycleSupervisor-1-1) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(Abstr
actFileConfigurationProvider.java:87)] Configuration provider started
2012-07-10 15:55:39,010 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking
file:conf/flume.conf for changes
2012-07-10 15:55:39,015 (conf-file-poller-0) [INFO -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:196)] Reloading
configuration file:conf/flume.conf
2012-07-10 15:55:39,023 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.properties.FlumeConfiguration$AgentConfiguration.i
sValid(FlumeConfiguration.java:225)] Starting validation of
configuration for agent: agent1, initial-configuration:
AgentConfiguration[agent1]
SOURCES: {avro-source1=ComponentConfiguration[avro-source1]
CONFIG: {port=41414, channels=ch1, type=avro, bind=0.0.0.0}
RUNNER: ComponentConfiguration[runner]
CONFIG: {}
}
CHANNELS: {ch1=ComponentConfiguration[ch1]
CONFIG: {type=memory}
}
SINKS: {accumulo-sink1=ComponentConfiguration[accumulo-sink1]
CONFIG: {type=flumeSink.AccumuloSink, channel=ch1}
RUNNER: ComponentConfiguration[runner]
CONFIG: {}
}
2012-07-10 15:55:39,024 (conf-file-poller-0) [INFO -
org.apache.flume.conf.properties.FlumeConfiguration.validateConfiguratio
n(FlumeConfiguration.java:119)] Post-validation flume configuration
contains configuation for agents: [agent1]
2012-07-10 15:55:39,026 (conf-file-poller-0) [DEBUG -
org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFact
ory.java:67)] Creating instance of channel ch1 type memory
2012-07-10 15:55:39,037 (conf-file-poller-0) [DEBUG -
org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory
.java:73)] Creating instance of source avro-source1, type avro
2012-07-10 15:55:39,057 (conf-file-poller-0) [INFO -
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
69)] Creating instance of sink accumulo-sink1 typeflumeSink.AccumuloSink
2012-07-10 15:55:39,074 (conf-file-poller-0) [DEBUG -
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
77)] Sink type flumeSink.AccumuloSink is a custom type
2012-07-10 15:55:39,118 (conf-file-poller-0) [DEBUG -
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:226)]
java.io.IOException: config(config)
at
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:226)
at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:184)
at
org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:52)
at org.apache.hadoop.mapreduce.Job.<init>(Job.java:49)
at
accumuloAccess.writers.IngestToSystemBehavior.setInstance(IngestToSystem
Behavior.java:42)
at flumeSink.AccumuloSink.<init>(AccumuloSink.java:41)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorA
ccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons
tructorAccessorImpl.java:45)
at
java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at java.lang.Class.newInstance0(Class.java:372)
at java.lang.Class.newInstance(Class.java:325)
at
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
102)
at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
dSinks(PropertiesFileConfigurationProvider.java:303)
at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
d(PropertiesFileConfigurationProvider.java:214)
at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(Abst
ractFileConfigurationProvider.java:124)
at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(
AbstractFileConfigurationProvider.java:38)
at
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:203)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:35
1)
at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.acc
ess$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:603)
at java.lang.Thread.run(Thread.java:722)
2012-07-10 15:55:39,120 (conf-file-poller-0) [ERROR -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:205)] Failed to load
configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to create sink: accumulo-sink1,
type: flumeSink.AccumuloSink, class: flumeSink.AccumuloSink
at
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
108)
at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
dSinks(PropertiesFileConfigurationProvider.java:303)
at
org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa
d(PropertiesFileConfigurationProvider.java:214)
at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(Abst
ractFileConfigurationProvider.java:124)
at
org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300(
AbstractFileConfigurationProvider.java:38)
at
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:203)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:35
1)
at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.acc
ess$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.NullPointerException
at
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:230)
at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:184)
at
org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:52)
at org.apache.hadoop.mapreduce.Job.<init>(Job.java:49)
at
accumuloAccess.writers.IngestToSystemBehavior.setInstance(IngestToSystem
Behavior.java:42)
at flumeSink.AccumuloSink.<init>(AccumuloSink.java:41)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorA
ccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons
tructorAccessorImpl.java:45)
at
java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at java.lang.Class.newInstance0(Class.java:372)
at java.lang.Class.newInstance(Class.java:325)
at
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:
102)
... 13 more
2012-07-10 15:56:09,016 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking
file:conf/flume.conf for changes
2012-07-10 15:56:39,016 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher
Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking
file:conf/flume.conf for changes
Any pointers for me to figure out how to solve my problem?
Thanx.
From: Eric Sammer [mailto:[email protected]]
Sent: Wednesday, June 20, 2012 3:56 PM
To: [email protected]
Subject: Re: accumulo sink
Ray:
Not off hand, no. That said, it shouldn't be terribly difficult to
build. I'm less familiar with the Accumulo client APIs, but it's so
close to HBase that I don't believe it would take you more than a day or
so with basic testing. Take a look at one of the sinks (one of the more
basic being the LoggerSink) to get a template to start from.
On Tue, Jun 19, 2012 at 1:14 PM, Martin, Ray <[email protected]>
wrote:
Is anyone aware of a Flume sink for Accumulo?
Thanx.
--
Eric Sammer
twitter: esammer
data: www.cloudera.com