Hi, There are other ways to do it, but I would:
1) Setup two agents, one on each host, node105 and node102. 2) In each agent run two separate flows, one for each file. On each node (source1, channel1, sink1 & source2, channel2, sink2): node105.sources.source1.type = exec node105.sources.source1.command = tail -F file1 node105.sources.source1.channels = channel1 node105.sources.source2.type = exec node105.sources.source2.command = tail -F file2 node105.sources.source2.channels = channel2 node105.channels.channel1.type = memory node105.channels.channel2.type = memory node105.sinks.sink1.type = avro node105.sinks.sink1.batch-size = 1000 node105.sinks.sink1.channel = channel1 node105.sinks.sink1.sink.hostname = node102 node105.sinks.sink1.sink.port = 8881 node105.sinks.sink2.type = avro node105.sinks.sink2.batch-size = 1000 node105.sinks.sink2.channel = channel2 node105.sinks.sink2.sink.hostname = node102 node105.sinks.sink2.sink.port = 8882 node102.sources.source1.type = avro node102.sources.source1.bind = 0.0.0.0 node102.sources.source1.port = 8881 node105.sources.source1.channels = channel1 node102.sources.source2.type = avro node102.sources.source2.bind = 0.0.0.0 node102.sources.source2.port = 8882 node102.sources.source2.channels = channel2 node102.channels.channel1.type = memory node102.channels.channel2.type = memory node102.sinks.sink1.type = FILE_ROLL node102.sinks.sink1.batchSize = 1000 node102.sinks.sink1.channel = channel1 node102.sinks.sink1.sink.directory = /dir1 node102.sinks.sink1.sink.rollInterval = 86400 node102.sinks.sink1.sink.serializer = TEXT node102.sinks.sink2.type = FILE_ROLL node102.sinks.sink2.batchSize = 1000 node102.sinks.sink2.channel = channel2 node102.sinks.sink2.sink.directory = /dir2 node102.sinks.sink2.sink.rollInterval = 86400 node102.sinks.sink2.sink.serializer = TEXT On Thu, Sep 27, 2012 at 11:37 AM, Cochran, David M (Contractor) <[email protected]> wrote: > Hi Brock, > > I hope I'm not becoming a total pain.. I understand your example > (actually your example from last week was able to get me there easily). > The current config you sent shows the logs being generated end up in a > local directory on the same box. My trouble is understanding how to get > the two tail-sources from this machine generating the logs to another > machine that's acting as the permanent home for the logs, so they end up > in their own directories, within one running instance of flume on the > receiving machine. I was able to make a real mess of things and get the > two source files end up as one combined file at the other side (total > mess). Just couldn't figure out how to get them to split apart. > > So far I've only been able to get the 2 files from the source machine to > the remote machine, but in this manner: one instance of flume running > on the source, but able to send one log file to the receiving machine on > one port and the other log file on another, which on the receiving > machine running two instances of flume, each listening on a different > port and writing out the data it receives to a set directory. > > Now while this cobbled together method works, it will not scale up well > at all.... with several servers each sending a few log files to the > server that will be final resting place for the logs. Dozens of flume > instances running on the receiving server across a myriad or ports = > disaster. > > Perhaps this confuses the matter further. > > -Dave > > > > -----Original Message----- > From: Brock Noland [mailto:[email protected]] > Sent: Thursday, September 27, 2012 9:46 AM > To: Cochran, David M (Contractor) > Subject: Re: multiple sources single channel > > This can all be in the same agent. > > node105.sources.source1.type = exec > node105.sources.source1.command = tail -F file1 > node105.sources.source1.channels = channel1 > > node105.sources.source2.type = exec > node105.sources.source2.command = tail -F file2 > node105.sources.source2.channels = channel2 > > node105.channels.channel1.type = memory > node105.channels.channel2.type = memory > > node105.sinks.sink1.type = FILE_ROLL > node105.sinks.sink1.batchSize = 1000 > node105.sinks.sink1.channel = channel1 > node105.sinks.sink1.sink.directory = /dir1 > node105.sinks.sink1.sink.rollInterval = 86400 > node105.sinks.sink1.sink.serializer = TEXT > > node105.sinks.sink2.type = FILE_ROLL > node105.sinks.sink2.batchSize = 1000 > node105.sinks.sink2.channel = channel2 > node105.sinks.sink2.sink.directory = /dir2 > node105.sinks.sink2.sink.rollInterval = 86400 > node105.sinks.sink2.sink.serializer = TEXT > > > > On Thu, Sep 27, 2012 at 7:47 AM, Brock Noland <[email protected]> > wrote: >> Ok, I'll send you an example later today. >> >> -- >> Brock Noland >> Sent with Sparrow >> >> On Thursday, September 27, 2012 at 7:28 AM, Cochran, David M >> (Contractor) >> wrote: >> >> What I'd like is for each file to end up in its own directory. >> >> -Dave >> >> -----Original Message----- >> From: Brock Noland [mailto:[email protected]] >> Sent: Wednesday, September 26, 2012 3:51 PM >> To: Cochran, David M (Contractor) >> Subject: Re: multiple sources single channel >> >> OK you have two tail sources. What is the end result you are looking >> for? >> >> 1) Each file which is being tailed ends up in it's own directory >> 2) Both files end up in the same directory >> 3) something else, if so pls explain >> >> Brock >> >> On Wed, Sep 26, 2012 at 2:50 PM, Cochran, David M (Contractor) >> <[email protected]> wrote: >> >> Hi Brock, >> >> So, are you saying that two sources into one channel to the sink and >> the sink can split the stream back out to two files for storage >> >> somehow? >> >> (I tried this but either it's not possible or my config isn't quite >> right) >> >> Or need i create a separate channel (each on their own port??) (and >> separate instance of flume???) for each file I want to store on the >> sink? >> >> -Dave >> >> >> -----Original Message----- >> From: Brock Noland [mailto:[email protected]] >> Sent: Tuesday, September 25, 2012 9:36 AM >> To: [email protected] >> Subject: Re: multiple sources single channel >> >> Hi, >> >> A source can write to multiple channels but a sink can only read from >> a single channel. So in this case, you probably just want one channel >> and then have both sources write to that single channel. Notice that >> for sources it's ".channels = " while for sinks it's ".channel = " >> >> Cheers, >> Brock >> >> On Tue, Sep 25, 2012 at 8:06 AM, Cochran, David M (Contractor) >> <[email protected]> wrote: >> >> >> I'm getting closer, but still doing something wrong.. Do most folks >> have this hard a time wrapping their heads around the configs or is it >> >> just me? >> >> >> -Dave >> >> >> host#1 - tailing 2 source log files... send to host#2 over one channel >> >> >> to sink FILE_ROLL to be written out. >> >> >> HOST#1 Results- >> 25 Sep 2012 07:40:51,289 WARN [conf-file-poller-0] >> (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validate >> S >> inks:678) >> - Configuration for : avroSink has errors, and will be removed: >> org.apache.flume.conf.ConfigurationException: Channel fileChannel >> fileChannel1 not in active set. >> >> >> HOST#2 Results- >> 25 Sep 2012 06:45:11,568 WARN [conf-file-poller-0] >> (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validate >> S >> inks:678) >> - Configuration for : filesink1 has errors, and will be removed: >> org.apache.flume.conf.ConfigurationException: Channel fileChannel1 >> not >> >> >> in active set. >> >> >> Config for each host follow: >> >> >> HOST#1- >> node105.sources = tailsource tailsource1 node105.channels = >> fileChannel fileChannel1 node105.sinks = avroSink >> >> node105.sources.tailsource.type = exec >> node105.sources.tailsource.command = tail -F /home/XXXX/test.log >> >> node105.sources.tailsource1.type = exec >> node105.sources.tailsource1.command = tail -F /home/XXXX/test1.log >> >> node105.sources.tailsource.channels = fileChannel >> node105.sources.tailsource1.channels = fileChannel1 >> >> ## Sink sends avro messages to xxxx on port 9432 >> node105.sinks.avroSink.type = avro node105.sinks.avroSink.channel = >> fileChannel fileChannel1 node105.sinks.avroSink.hostname = >> 192.168.1.128 node105.sinks.avroSink.port = 9432 >> >> node105.channels.fileChannel.type = file >> node105.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints >> node105.channels.fileChannel.dataDirs = /tmp/flume/data2 >> node105.channels.fileChannel.capacity = 10000 >> node105.channels.fileChannel.checkpointInterval = 3000 >> node105.channels.fileChannel.maxFileSize = 5242880 >> >> node105.channels.fileChannel1.type = file >> node105.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1 >> >> >> node105.channels.fileChannel1.dataDirs = /tmp/flume/data22 >> node105.channels.fileChannel1.capacity = 10000 >> node105.channels.fileChannel1.checkpointInterval = 3000 >> node105.channels.fileChannel1.maxFileSize = 5242880 >> >> >> >> >> HOST#2 - >> node102.sources = avroSource >> node102.channels = fileChannel FileChannel1 node102.sinks = filesink >> filesink1 >> >> ## Source listens for avro messages on port 9432 on all ips >> node102.sources.avroSource.type = avro >> node102.sources.avroSource.channels = fileChannel fileChannel1 >> node102.sources.avroSource.bind = 0.0.0.0 >> node102.sources.avroSource.port = 9432 >> >> node102.sinks.filesink.type = FILE_ROLL >> node102.sinks.filesink.batchSize = 1000 >> node102.sinks.filesink.channel >> >> >> = fileChannel node102.sinks.filesink.sink.directory = /logs/rhel5/ >> node102.sinks.filesink.sink.rollInterval = 86400 >> node102.sinks.filesink.sink.serializer = TEXT >> >> node102.channels.fileChannel.type = file >> node102.channels.fileChannel.checkpointDir = /tmp/flume/checkpoints >> node102.channels.fileChannel.dataDirs = /tmp/flume/data1 >> node102.channels.fileChannel.capacity = 5000 >> node102.channels.fileChannel.checkpointInterval = 45000 >> node102.channels.fileChannel.maxFileSize = 5242880 >> >> node102.sinks.filesink1.type = FILE_ROLL >> node102.sinks.filesink1.batchSize = 1000 >> node102.sinks.filesink1.channel = fileChannel1 >> node102.sinks.filesink1.sink.directory = /logs/rhel51/ >> node102.sinks.filesink1.sink.rollInterval = 86400 >> node102.sinks.filesink1.sink.serializer = TEXT >> >> node102.channels.fileChannel1.type = file >> node102.channels.fileChannel1.checkpointDir = /tmp/flume/checkpoints1 >> >> >> node102.channels.fileChannel1.dataDirs = /tmp/flume/data2 >> node102.channels.fileChannel1.capacity = 5000 >> node102.channels.fileChannel1.checkpointInterval = 45000 >> node102.channels.fileChannel1.maxFileSize = 5242880 >> >> >> >> >> -- >> Apache MRUnit - Unit testing MapReduce - >> http://incubator.apache.org/mrunit/ >> >> >> >> >> -- >> Apache MRUnit - Unit testing MapReduce - >> http://incubator.apache.org/mrunit/ >> >> > > > > -- > Apache MRUnit - Unit testing MapReduce - > http://incubator.apache.org/mrunit/ -- Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/
