I was able to resolve “unable to deserialize FileBasedSink” error by adding
withNumShards().
inputData.apply(FileIO.<String, GenericRecord>writeDynamic()
.by(record -> "test")
.withDestinationCoder(StringUtf8Coder.of())
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withNumShards(10)
.withNaming(new SimpleFunction<String,
FileIO.Write.FileNaming>() {
@Override
public FileIO.Write.FileNaming apply(String input) {
return FileIO.Write.relativeFileNaming(
ValueProvider.StaticValueProvider.of(outputPath
+ "/" + input), naming);
}
}));
Now I am seeing a new error as below. Is this related to
https://issues.apache.org/jira/browse/BEAM-9868? I don’t quite understand what
this error means. Please advise.
Exception in thread "main" java.lang.IllegalArgumentException: unable to
deserialize Custom DoFn With Execution Info
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:709)
at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
at
org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
at
org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:87)
From: Tao Li <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, March 4, 2021 at 11:52 AM
To: "[email protected]" <[email protected]>, Kobe Feng
<[email protected]>
Cc: Yuchu Cao <[email protected]>
Subject: Re: Does writeDynamic() support writing different element groups to
different output paths?
I tried below code:
inputData.apply(FileIO.<String, GenericRecord>writeDynamic()
.by(record -> "test")
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withNaming(new SimpleFunction<String,
FileIO.Write.FileNaming>() {
@Override
public FileIO.Write.FileNaming apply(String input) {
return FileIO.Write.relativeFileNaming(
ValueProvider.StaticValueProvider.of(outputPath
+ "/" + input), naming);
}
})
.withDestinationCoder(StringUtf8Coder.of()));
Exception in thread "main" java.lang.IllegalArgumentException: unable to
deserialize FileBasedSink
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
at
org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.sinkFromProto(WriteFilesTranslation.java:125)
at
org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.getSink(WriteFilesTranslation.java:137)
at
org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:69)
at
org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:564)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:299)
When I switch to use write() API as below, it works fine. Does anyone have any
ideas? Thanks!
inputData.apply(FileIO.<GenericRecord>write()
.withNumShards(10)
.via(ParquetIO.sink(inputAvroSchema))
.to(outputPath)
.withSuffix(".parquet"));
From: Tao Li <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Thursday, March 4, 2021 at 9:36 AM
To: "[email protected]" <[email protected]>, Kobe Feng
<[email protected]>
Cc: Yuchu Cao <[email protected]>
Subject: Re: Does writeDynamic() support writing different element groups to
different output paths?
Thanks Kobe let me give it a try!
From: Kobe Feng <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "[email protected]" <[email protected]>
Cc: Yuchu Cao <[email protected]>
Subject: Re: Does writeDynamic() support writing different element groups to
different output paths?
I used the following way long time ago for writing into partitions in hdfs
(maybe better solutions from others), and not sure any interface change which
you need to check:
val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix,
FileIO.writeDynamic[String, GenericRecord]()
.by(recordPartition.partitionFunc)
.withDestinationCoder(StringUtf8Coder.of())
.via(DarwinParquetIO.sink(...)
.to(baseDir)
...
.withNaming((partitionFolder: String) =>
relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR +
partitionFolder), fileNaming))
...
val partitionFunc: T => String
the good practice is auto-switch: using event time field from record value for
partitioning when event time window, or process time.
and partitionFunc could consider multi partition columns to get subdirectories
base on ur file system path separator, e.g. S3.
On Wed, Mar 3, 2021 at 5:36 PM Tao Li <[email protected]<mailto:[email protected]>>
wrote:
Hi Beam community,
I have a streaming app that writes every hour’s data to a folder named with
this hour. With Flink (for example), we can leverage “Bucketing File Sink”:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fconnectors%2Ffilesystem_sink.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030924936%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UdztxrPHWE%2B94FslOWpJQpovdB8XJJk7sNYcY6KPP3U%3D&reserved=0>
However I am not seeing Beam FileIO’s writeDynamic API supports specifying
different output paths for different groups:
https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.28.0%2Findex.html%3Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileIO.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030934892%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=lZuUQJAvuSxgCUNP%2BckbHQLqNq8u%2FcGMAXFSA2KOqW0%3D&reserved=0>
Seems like writeDynamic() only supports specifying different naming strategy.
How can I specify different hourly based output paths for hourly data with Beam
writeDynamic? Please advise. Thanks!
--
Yours Sincerely
Kobe Feng