Hi All,

I am using Flume to move kafka messages to GCS(Google Cloud Storage). While
doing that, I observed some data loss in the pipeline.

On digging through the logs, I found that whenever I see data loss, the
"file close" step failed the first time and then the "file close"  wasn't
re-attempted later. In GCS, I cannot find the files for which the file
close step failed.

Whereas, if the file close was re-attempted, I can find the files in GCS.

*Is there any explanation for this behaviour? Can someone please recommend
a fix for this?*

The logs below show that the file close failed the first time. And, this
file is missing from the GCS. Also, the file close was not re-attempted in
this case.

*PFB Flume Logs:*

*NOTE: I have changed the actual file name present in the log. *

18 Dec 2020 19:16:19,368 WARN
 [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.BucketWriter$CloseHandler.close:348)  - Closing
file: gs://GCS-BUCKET/FILEPATH failed. Will retry again in 180 seconds.

java.io.IOException:
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
410 Gone
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:367)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:238)
        at java.nio.channels.Channels$1.close(Channels.java:178)
        at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
        at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:127)
        at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at
org.apache.flume.sink.hdfs.HDFSSequenceFile.close(HDFSSequenceFile.java:119)
        at
org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:319)
        at
org.apache.flume.sink.hdfs.BucketWriter$3.call(BucketWriter.java:316)
        at
org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:727)
        at
org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at
org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:724)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by:
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
410 Gone
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:150)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:512)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549)
        at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:301)
        ... 4 more

*Version Details:*

Flume *: *1.9.0
GCS connector jar *: *gcs-connector-hadoop2-1.9.17-shaded.jar


*PFB Flume Config:*

agent.channels = dummyChannelName
agent.channels.dummyChannelName.type =
org.apache.flume.channel.kafka.KafkaChannel
agent.channels.dummyChannelName.kafka.bootstrap.servers = <Comma seperated
list of kafka servers>
agent.channels.dummyChannelName.kafka.topic=<kafkaTopicName>
agent.channels.dummyChannelName.kafka.consumer.group.id
=<KafkaConsumerGroupId>
agent.channels.dummyChannelName.parseAsFlumeEvent=false
agent.channels.dummyChannelName.kafka.pollTimeout=10000
agent.channels.dummyChannelName.kafka.consumer.max.partition.fetch.bytes=20971520
agent.channels.dummyChannelName.capacity=10000
agent.channels.dummyChannelName.transactionCapacity=1000
agent.channels.dummyChannelName.kafka.consumer.auto.offset.reset=earliest

agent.sinks = gcp-sink

agent.sinks.gcp-sink.channel = dummyChannelName
agent.sinks.gcp-sink.type = hdfs
agent.sinks.gcp-sink.hdfs.path = gs://GCS-BUCKET-PATH/dt=%Y-%m-%d/hour=%k
agent.sinks.gcp-sink.hdfs.rollSize = 124000000
agent.sinks.gcp-sink.hdfs.rollInterval = 200
agent.sinks.gcp-sink.hdfs.rollCount = 0
agent.sinks.gcp-sink.hdfs.codeC = Lz4Codec
agent.sinks.gcp-sink.hdfs.fileType = SequenceFile
agent.sinks.gcp-sink.hdfs.writeFormat = Writable
agent.sinks.gcp-sink.hdfs.useLocalTimeStamp = true
agent.sinks.gcp-sink.hdfs.timeZone = UTC
agent.sinks.gcp-sink.hdfs.callTimeout = 600000
agent.sinks.gcp-sink.hdfs.filePrefix=FlumeFile
agent.sinks.gcp-sink.hdfs.closeTries = 3

*NOTE: I have changed the actual paths/ ip addresses in the config.*

Please let me know if I need to provide anything else. Any help is
appreciated.

Regards,
Shivanjal Arora

Reply via email to