Hi,
I am trying to upload avro records to AWS S3 using StreamingFileSink.
Avro file is getting created and uploaded with valid data but I want to add a
Rolling policy which will roll the file after specific time or total part file
size.
With forBulkFormat() I am able to use only CheckpointRollingPolicy which is
automatically rolling all part files on every check point.
Though CheckpointRollingPolicy allows us to override 'shouldRollOnEvent' &
'shouldRollOnProcessingTime' it is not behaving as per overrided logic and
just rolls out on each check point.
-'shouldRollOnProcessingTime' method is not getting invoked for each streaming
message, its
- 'shouldRollOnEvent' part file size here always shows fixed single message
size only not the clubbed part file size
Below is the code snippet -
val avroOcfFilesink : StreamingFileSink[GenericRecord] =
StreamingFileSink.forBulkFormat(new Path(avroOutputPath),
new AvroWriterFactory[GenericRecord](new AvroBuilder[GenericRecord]() {
override def createWriter(out: OutputStream):
DataFileWriter[GenericRecord] = {
val schema: Schema = new
Schema.Parser().parse(faultCodeOCFRecordSchema)
val datumWriter = new ReflectDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
//dataFileWriter.setCodec(CodecFactory.snappyCodec)
dataFileWriter.create(schema, out)
dataFileWriter
}
}))
.withBucketAssigner(new BucketAssigner[GenericRecord, String] {
override def getBucketId(in: GenericRecord, context: Context): String =
{
val bucketIdPrefix =
configurationParameters.getRequired("s3.bucket.id.prefix")
val currentProcessingTimeUTC = System.currentTimeMillis()
bucketIdPrefix +
TimeConversion.convertTimestampToRunDate_HHMM(currentProcessingTimeUTC)
}
override def getSerializer: SimpleVersionedSerializer[String] = {
SimpleVersionedStringSerializer.INSTANCE }
}).
withRollingPolicy(
new CheckpointRollingPolicy[GenericRecord, String] {
override def shouldRollOnEvent(partFileState: PartFileInfo[String],
element: GenericRecord): Boolean = {
println("partFileState.getSize:"+partFileState.getSize)
(partFileState.getSize >= 1024*8)
}
override def shouldRollOnProcessingTime(partFileState:
PartFileInfo[String], currentTime: Long): Boolean = {
println("currentTime:"+currentTime+" ,
partFileState.getCreationTime"+partFileState.getCreationTime+",
Diff:"+(currentTime - partFileState.getCreationTime))
(currentTime - partFileState.getCreationTime >= 600000)
}
}
).build()
avroRecordStream.addSink(avroOcfFilesink).setParallelism(1).name("AvroToS3Bucket")