Hi

What is the implication of this property "hdfs.callTimeout". What adverse
effect it may have if I change it ?

I am getting timeout exception as:
Noted checkpoint for file: /home/hadoop/flume_channel/dataDir15/log-21, id: 21, checkpoint position: 1576210481 12/10/03 23:19:45 INFO file.LogFile: Closing /home/hadoop/flume_channel/dataDir15/log-21
12/10/03 23:19:55 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: Callable timed out
at org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:343) at org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:714) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:736)
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
at org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:336)
        ... 5 more


My configuration is:

Agent A: Source
==========

adServerAgent.sources = execSource
adServerAgent.channels = fileChannel
adServerAgent.sinks = avro-forward-sink1
#adServerAgent.sinkgroups = failover_group

# For each one of the sources, the type is defined
adServerAgent.sources.execSource.type = exec
adServerAgent.sources.execSource.command = /usr/bin/perl /home/http/flume/scripts/logtailDir_trial.pl 2>/tmp/logtail_failure.log
adServerAgent.sources.execSource.restart=false
adServerAgent.sources.execSource.batchSize = 1000

# The channel can be defined as follows.
adServerAgent.sources.execSource.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.avro-forward-sink1.type = avro
adServerAgent.sinks.avro-forward-sink1.hostname=10.0.17.3
adServerAgent.sinks.avro-forward-sink1.port=10012
adServerAgent.sinks.avro-forward-sink1.connect-timeout = 300000

#Specify the channel the sink should use
adServerAgent.sinks.avro-forward-sink1.channel = fileChannel
adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/home/http/flume/channel/dataDir_trial
adServerAgent.channels.fileChannel.checkpointDir=/home/http/flume/channel/checkpointDir_trial
adServerAgent.channels.fileChannel.write-timeout=30

where the script in the exec source just cats the files in the given directory:
It is :
Exec script is
========
my $DIR = "/TRACKING_FILES/backuped";
#my $MOVED_DIR = "";
my $OFFSET_DIR = "$ENV{'HOME'}/flume/offset_dir_trial";
my $SLEEP_TIME = 145;
my $LOGATAIL_CMD = "$ENV{'HOME'}/flume/logtail_install/usr/sbin/logtail2";
################

while(1)
{
        opendir(DIR,$DIR) or die "Couldn't open dir $DIR. $!";
#       chomp(my @files = `ls $DIR`);
#       foreach $file (@files)
        while(my $file = readdir(DIR))
        {
                #print $file."\n";
                #if($file =~ m/\d+impressionthread\d+\.tsv/)
                #{
                        if(-f "$DIR/$file")
                        {
# print "logtail2 -f $DIR/$file -o $OFFSET_DIR/$file.offset"; print `$LOGATAIL_CMD -f $DIR/$file -o $OFFSET_DIR/$file.offset`;
                        }
                #}
        }
        closedir(DIR);
        sleep($SLEEP_TIME);
#       print "\n @files :".@files;
}


Agent B is: (Destination)
==============

adServerAgent.sources = avro-collection-source
adServerAgent.channels = fileChannel
adServerAgent.sinks = hdfsSink fileSink

# For each one of the sources, the type is defined
adServerAgent.sources.avro-collection-source.type=avro
adServerAgent.sources.avro-collection-source.bind=10.0.17.3
adServerAgent.sources.avro-collection-source.port=10012
adServerAgent.sources.avro-collection-source.interceptors = ts
adServerAgent.sources.avro-collection-source.interceptors.ts.type = timestamp
# The channel can be defined as follows.
adServerAgent.sources.avro-collection-source.channels = fileChannel

# Each sink's type must be defined
adServerAgent.sinks.hdfsSink.type = hdfs
adServerAgent.sinks.hdfsSink.hdfs.path= hdfs://mltest2001.pubmatic.com/flume/experiments_1_machine
#adServerAgent.sinks.hdfsSink.hdfs.path=hdfs://mltest2001.pubmatic.com/flume/trackers

#adServerAgent.sinks.hdfsSink.hdfs.fileType =DataStream
adServerAgent.sinks.hdfsSink.hdfs.fileType =CompressedStream
adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_%Y%m%d_%H%M%S_
#adServerAgent.sinks.hdfsSink.hdfs.filePrefix=adtrack_backup_
adServerAgent.sinks.hdfsSink.hdfs.rollSize=100000000
adServerAgent.sinks.hdfsSink.hdfs.codeC=bzip2
adServerAgent.sinks.hdfsSink.hdfs.rollCount=20000
adServerAgent.sinks.hdfsSink.hdfs.batchSize=1
#adServerAgent.sinks.hdfsSink.hdfs.writeFormat=Text
adServerAgent.sinks.hdfsSink.hdfs.rollInterval=600
adServerAgent.sinks.hdfsSink.hdfs.txnEventMax=1
#adServerAgent.sinks.hdfsSink.hdfs.maxOpenFiles=20000


#Define file sink
adServerAgent.sinks.fileSink.type = file_roll
adServerAgent.sinks.fileSink.sink.directory = /home/hadoop/flume_sink
adServerAgent.sinks.hdfsSink.channel= fileChannel
#adServerAgent.sinks.fileSink.channel = fileChannel

# Each channel's type is defined.
adServerAgent.channels.fileChannel.type=file
adServerAgent.channels.fileChannel.dataDirs=/home/hadoop/flume_channel/dataDir15
adServerAgent.channels.fileChannel.checkpointDir=/home/hadoop/flume_channel/checkpointDir15
adServerAgent.channels.fileChannel.write-timeout=30


Regards,
Jagadish

Reply via email to