I am writing a pipeline that will read from kafka and convert the data into
Avro files with a fixed windows of 10min.
I am using a *DynamicAvroDestinations *in order to build a dynamic path and
select the corresponding schema based on the incoming data.
1.)
While testing on my machine (With DirectRunner) i am using a File
(BoundedSource) containing hundreds of this messages and feeding my
pipeline with this, however i have found that sometimes, the pipeline fail
with :
{code}Caused by: java.nio.file.FileAlreadyExistsException:
/tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
at java.nio.file.Files.copy(Files.java:1274)
at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
at
org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
at
org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
{code}
2.)Trying to group all related messages into a single AVRO file
(YYYYMMDD-HHMM), when the pipeline doesn't fail using the DirectRunner,
the generate AVRO files only contains 1 record per file.
if i generate a pseudo-random name in the *getFilenamePolicy *everything
works, but i ends up with hundreds of files, each of them containing 1
record.
My PipelineSteps are:
-- ReadFromSource
--ApplyWindows
(FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
--Create a KV from KafkaRecord (The Key is the date as YYYYMMDDHHMM)
--GroupbyKey (this return a KV<Long, Iterable<String>>)
--Emit Each records of the iterable as (KV<Long, String>)
--AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )
Also please find below is the code for the getFileNamePolicy:
{code}
@Override
public FileBasedSink.FilenamePolicy getFilenamePolicy(final
GenericRecordDestination destination) {
return new FileBasedSink.FilenamePolicy() {
@Override
public ResourceId windowedFilename(final int shardNumber, final int
numShards, final BoundedWindow window, final PaneInfo paneInfo, final
FileBasedSink.OutputFileHints outputFileHints) {
StringBuilder path = new StringBuilder(filesLocation);
if (!filesLocation.endsWith("/")) {
path.append("/");
}
path.append(DateTimeUtil.format(destination.getTimestamp(),
"yyyy/MM"))
.append("/")
.append(filesPrefix)
.append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
"dd"))
.append("-w-").append(window.maxTimestamp().getMillis())
.append("-").append(shardNumber)
.append("_").append(numShards)
.append(AVRO_SUFFIX);
return FileSystems.matchNewResource(path.toString(), false);
}
@Nullable
@Override
public ResourceId unwindowedFilename(final int shardNumber, final
int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
throw new PipelineException("unwindowedFilename is not
supported");
}
};
}
{code}
Thanks
--
JC