Hi Guys, after days of bumping my head against the monitor i found why it
was not working.
One key element when using *DynamicAvroDestinations *that is not described
in the documentation is that, if you are using a regular POJO as
*DestinationT* like i am (and not Long/String/Integer as the example) :
{code}
DynamicAvroDestinations<String, GenericRecordDynamicDestination,
GenericRecord>
{code}
Its very important to pay attention to equals / hashCode implementations,
which should aligned with your sharding/grouping/partition structure. Not
doing so will give you the result i described earlier (1 file (or shard)
with 1 record only, or sometime just an exception).
While i still don't understand why it depends on equals / hashCode, as i
checked the class on:
*org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*
The hashing depends on the Coder itself (method: <DestinationT> int
hashDestination(DestinationT destination, Coder<DestinationT>
destinationCoder)).
Maybe a core member could explain the reason of it, or its an unexpected
behavior and there is a bug somewhere else.
In my case below you can find my POJO Destination along with the
corresponding Codec implementation, which works correctly as long as the
equals / hashCode are implemented:
{code}
static class GenericRecordDynamicDestination {
private String logicalType;
private final int year;
private final int month;
private final int day;
public GenericRecordDynamicDestination(final String _logicalType,
final int _year, final int _month, final int _day) {
logicalType = _logicalType;
year = _year;
month = _month;
day = _day;
}
public String getLogicalType() {
return logicalType;
}
public void setLogicalType(final String _logicalType) {
logicalType = _logicalType;
}
public int getYear() {
return year;
}
public int getMonth() {
return month;
}
public int getDay() {
return day;
}
@Override
public boolean equals(final Object _o) {
if (this == _o) return true;
if (_o == null || getClass() != _o.getClass()) return false;
final GenericRecordDynamicDestination that =
(GenericRecordDynamicDestination) _o;
if (year != that.year) return false;
if (month != that.month) return false;
if (day != that.day) return false;
return logicalType.equals(that.logicalType);
}
@Override
public int hashCode() {
int result = logicalType.hashCode();
result = 31 * result + year;
result = 31 * result + month;
result = 31 * result + day;
return result;
}
}
static class GenericRecordDestinationCoder extends
CustomCoder<GenericRecordDynamicDestination> {
@Override
public void encode(final GenericRecordDynamicDestination value,
final OutputStream outStream) throws IOException {
final ObjectOutputStream out = new
ObjectOutputStream(outStream);
out.writeUTF(value.getLogicalType());
out.writeInt(value.getYear());
out.writeInt(value.getMonth());
out.writeInt(value.getDay());
out.flush();
}
@Override
public GenericRecordDynamicDestination decode(final InputStream
inStream) throws IOException {
final ObjectInputStream in = new ObjectInputStream(inStream);
String logicalType = in.readUTF();
int year = in.readInt();
int month = in.readInt();
int day = in.readInt();
return new GenericRecordDynamicDestination(logicalType, year,
month, day);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
//
}
}
{code}
On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <[email protected]>
wrote:
> I am writing a pipeline that will read from kafka and convert the data
> into Avro files with a fixed windows of 10min.
>
> I am using a *DynamicAvroDestinations *in order to build a dynamic path
> and select the corresponding schema based on the incoming data.
>
> 1.)
> While testing on my machine (With DirectRunner) i am using a File
> (BoundedSource) containing hundreds of this messages and feeding my
> pipeline with this, however i have found that sometimes, the pipeline fail
> with :
> {code}Caused by: java.nio.file.FileAlreadyExistsException:
> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
> at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
> at java.nio.file.Files.copy(Files.java:1274)
> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
> at
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
> at
> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
> {code}
>
> 2.)Trying to group all related messages into a single AVRO file
> (YYYYMMDD-HHMM), when the pipeline doesn't fail using the DirectRunner,
> the generate AVRO files only contains 1 record per file.
>
> if i generate a pseudo-random name in the *getFilenamePolicy *everything
> works, but i ends up with hundreds of files, each of them containing 1
> record.
>
> My PipelineSteps are:
> -- ReadFromSource
> --ApplyWindows
> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
> --Create a KV from KafkaRecord (The Key is the date as YYYYMMDDHHMM)
> --GroupbyKey (this return a KV<Long, Iterable<String>>)
> --Emit Each records of the iterable as (KV<Long, String>)
> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )
>
> Also please find below is the code for the getFileNamePolicy:
> {code}
> @Override
> public FileBasedSink.FilenamePolicy getFilenamePolicy(final
> GenericRecordDestination destination) {
> return new FileBasedSink.FilenamePolicy() {
> @Override
> public ResourceId windowedFilename(final int shardNumber, final
> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final
> FileBasedSink.OutputFileHints outputFileHints) {
>
> StringBuilder path = new StringBuilder(filesLocation);
> if (!filesLocation.endsWith("/")) {
> path.append("/");
> }
>
> path.append(DateTimeUtil.format(destination.getTimestamp(),
> "yyyy/MM"))
> .append("/")
> .append(filesPrefix)
>
> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
> "dd"))
>
> .append("-w-").append(window.maxTimestamp().getMillis())
> .append("-").append(shardNumber)
> .append("_").append(numShards)
> .append(AVRO_SUFFIX);
>
> return FileSystems.matchNewResource(path.toString(), false);
> }
>
> @Nullable
> @Override
> public ResourceId unwindowedFilename(final int shardNumber, final
> int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
> throw new PipelineException("unwindowedFilename is not
> supported");
> }
> };
> }
> {code}
>
> Thanks
> --
>
> JC
>
>
--
JC