Sorry - this was not a flume issue. I had a leftover jar file from a previous version of accumulo.
Got corrected. I think I am now standing up. Still testing. Again sorry. From: Martin, Ray [mailto:[email protected]] Sent: Tuesday, July 10, 2012 4:02 PM To: [email protected] Subject: RE: accumulo sink I created a custom flume sink. Added appropriate jars into lib director of flume-ng. Set classpath in conf/flume-env.sh Configured conf/flume.conf as follows: # Sources, channels and sinks are defined per agent # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. agent1.sources.avro-source1.channels = ch1 agent1.sources.avro-source1.type = avro agent1.sources.avro-source1.bind = 0.0.0.0 agent1.sources.avro-source1.port = 41414 # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. agent1.sinks.accumulo-sink1.channel = ch1 agent1.sinks.accumulo-sink1.type = flumeSink.AccumuloSink # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = avro-source1 agent1.sinks = accumulo-sink1 ran flume and get nullpointer: ./bin/flume-ng node --conf conf/ -f conf/flume.conf -n agent1 2012-07-10 15:55:38,990 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor .java:58)] Starting lifecycle supervisor 1 2012-07-10 15:55:38,994 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)] Flume node starting - agent1 2012-07-10 15:55:39,001 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(Defaul tLogicalNodeManager.java:110)] Node manager starting 2012-07-10 15:55:39,001 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(Abstr actFileConfigurationProvider.java:67)] Configuration provider starting 2012-07-10 15:55:39,004 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor .java:58)] Starting lifecycle supervisor 9 2012-07-10 15:55:39,010 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(Defaul tLogicalNodeManager.java:114)] Node manager started 2012-07-10 15:55:39,010 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(Abstr actFileConfigurationProvider.java:87)] Configuration provider started 2012-07-10 15:55:39,010 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking file:conf/flume.conf for changes 2012-07-10 15:55:39,015 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:196)] Reloading configuration file:conf/flume.conf 2012-07-10 15:55:39,023 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.properties.FlumeConfiguration$AgentConfiguration.i sValid(FlumeConfiguration.java:225)] Starting validation of configuration for agent: agent1, initial-configuration: AgentConfiguration[agent1] SOURCES: {avro-source1=ComponentConfiguration[avro-source1] CONFIG: {port=41414, channels=ch1, type=avro, bind=0.0.0.0} RUNNER: ComponentConfiguration[runner] CONFIG: {} } CHANNELS: {ch1=ComponentConfiguration[ch1] CONFIG: {type=memory} } SINKS: {accumulo-sink1=ComponentConfiguration[accumulo-sink1] CONFIG: {type=flumeSink.AccumuloSink, channel=ch1} RUNNER: ComponentConfiguration[runner] CONFIG: {} } 2012-07-10 15:55:39,024 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.FlumeConfiguration.validateConfiguratio n(FlumeConfiguration.java:119)] Post-validation flume configuration contains configuation for agents: [agent1] 2012-07-10 15:55:39,026 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFact ory.java:67)] Creating instance of channel ch1 type memory 2012-07-10 15:55:39,037 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory .java:73)] Creating instance of source avro-source1, type avro 2012-07-10 15:55:39,057 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java: 69)] Creating instance of sink accumulo-sink1 typeflumeSink.AccumuloSink 2012-07-10 15:55:39,074 (conf-file-poller-0) [DEBUG - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java: 77)] Sink type flumeSink.AccumuloSink is a custom type 2012-07-10 15:55:39,118 (conf-file-poller-0) [DEBUG - org.apache.hadoop.conf.Configuration.<init>(Configuration.java:226)] java.io.IOException: config(config) at org.apache.hadoop.conf.Configuration.<init>(Configuration.java:226) at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:184) at org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:52) at org.apache.hadoop.mapreduce.Job.<init>(Job.java:49) at accumuloAccess.writers.IngestToSystemBehavior.setInstance(IngestToSystem Behavior.java:42) at flumeSink.AccumuloSink.<init>(AccumuloSink.java:41) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorA ccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons tructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at java.lang.Class.newInstance0(Class.java:372) at java.lang.Class.newInstance(Class.java:325) at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java: 102) at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa dSinks(PropertiesFileConfigurationProvider.java:303) at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa d(PropertiesFileConfigurationProvider.java:214) at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(Abst ractFileConfigurationProvider.java:124) at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300( AbstractFileConfigurationProvider.java:38) at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:203) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:35 1) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.acc ess$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:603) at java.lang.Thread.run(Thread.java:722) 2012-07-10 15:55:39,120 (conf-file-poller-0) [ERROR - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:205)] Failed to load configuration data. Exception follows. org.apache.flume.FlumeException: Unable to create sink: accumulo-sink1, type: flumeSink.AccumuloSink, class: flumeSink.AccumuloSink at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java: 108) at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa dSinks(PropertiesFileConfigurationProvider.java:303) at org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loa d(PropertiesFileConfigurationProvider.java:214) at org.apache.flume.conf.file.AbstractFileConfigurationProvider.doLoad(Abst ractFileConfigurationProvider.java:124) at org.apache.flume.conf.file.AbstractFileConfigurationProvider.access$300( AbstractFileConfigurationProvider.java:38) at org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:203) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:35 1) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.acc ess$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.lang.NullPointerException at org.apache.hadoop.conf.Configuration.<init>(Configuration.java:230) at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:184) at org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:52) at org.apache.hadoop.mapreduce.Job.<init>(Job.java:49) at accumuloAccess.writers.IngestToSystemBehavior.setInstance(IngestToSystem Behavior.java:42) at flumeSink.AccumuloSink.<init>(AccumuloSink.java:41) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorA ccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingCons tructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at java.lang.Class.newInstance0(Class.java:372) at java.lang.Class.newInstance(Class.java:325) at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java: 102) ... 13 more 2012-07-10 15:56:09,016 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking file:conf/flume.conf for changes 2012-07-10 15:56:39,016 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcher Runnable.run(AbstractFileConfigurationProvider.java:189)] Checking file:conf/flume.conf for changes Any pointers for me to figure out how to solve my problem? Thanx. From: Eric Sammer [mailto:[email protected]] Sent: Wednesday, June 20, 2012 3:56 PM To: [email protected] Subject: Re: accumulo sink Ray: Not off hand, no. That said, it shouldn't be terribly difficult to build. I'm less familiar with the Accumulo client APIs, but it's so close to HBase that I don't believe it would take you more than a day or so with basic testing. Take a look at one of the sinks (one of the more basic being the LoggerSink) to get a template to start from. On Tue, Jun 19, 2012 at 1:14 PM, Martin, Ray <[email protected]> wrote: Is anyone aware of a Flume sink for Accumulo? Thanx. -- Eric Sammer twitter: esammer data: www.cloudera.com
