Hi Guys, after days of bumping my head against the monitor i found why it
was not working.

One key element when using *DynamicAvroDestinations *that is not described
in the documentation is that, if you are using a regular POJO as
*DestinationT* like i am (and not Long/String/Integer as the example) :

{code}
   DynamicAvroDestinations<String, GenericRecordDynamicDestination,
GenericRecord>
{code}

Its very important to pay attention to equals / hashCode implementations,
which should aligned with your sharding/grouping/partition structure. Not
doing so will give you the result i described earlier (1 file (or shard)
with 1 record only, or sometime just an exception).

While i still don't understand why it depends on equals / hashCode, as i
checked the class on:
  *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*

The hashing depends on the Coder itself (method: <DestinationT> int
hashDestination(DestinationT destination, Coder<DestinationT>
destinationCoder)).

Maybe a core member could explain the reason of it, or its an unexpected
behavior and there is a bug somewhere else.

In my case below you can find my POJO Destination along with the
corresponding Codec implementation, which works correctly as long as the
equals / hashCode are implemented:

{code}
static class GenericRecordDynamicDestination {
        private String logicalType;
        private final int year;
        private final int month;
        private final int day;

        public GenericRecordDynamicDestination(final String _logicalType,
final int _year, final int _month, final int _day) {
            logicalType = _logicalType;
            year = _year;
            month = _month;
            day = _day;
        }

        public String getLogicalType() {
            return logicalType;
        }

        public void setLogicalType(final String _logicalType) {
            logicalType = _logicalType;
        }

        public int getYear() {
            return year;
        }

        public int getMonth() {
            return month;
        }

        public int getDay() {
            return day;
        }

        @Override
        public boolean equals(final Object _o) {
            if (this == _o) return true;
            if (_o == null || getClass() != _o.getClass()) return false;

            final GenericRecordDynamicDestination that =
(GenericRecordDynamicDestination) _o;

            if (year != that.year) return false;
            if (month != that.month) return false;
            if (day != that.day) return false;
            return logicalType.equals(that.logicalType);
        }

        @Override
        public int hashCode() {
            int result = logicalType.hashCode();
            result = 31 * result + year;
            result = 31 * result + month;
            result = 31 * result + day;
            return result;
        }
}

static class GenericRecordDestinationCoder extends
CustomCoder<GenericRecordDynamicDestination> {
        @Override
        public void encode(final GenericRecordDynamicDestination value,
final OutputStream outStream) throws IOException {
            final ObjectOutputStream out = new
ObjectOutputStream(outStream);
            out.writeUTF(value.getLogicalType());
            out.writeInt(value.getYear());
            out.writeInt(value.getMonth());
            out.writeInt(value.getDay());
            out.flush();
        }

        @Override
        public GenericRecordDynamicDestination decode(final InputStream
inStream) throws IOException {
            final ObjectInputStream in = new ObjectInputStream(inStream);
            String logicalType = in.readUTF();
            int year = in.readInt();
            int month = in.readInt();
            int day = in.readInt();
            return new GenericRecordDynamicDestination(logicalType, year,
month, day);
        }

        @Override
        public void verifyDeterministic() throws NonDeterministicException {
            //
        }
}
{code}


On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <[email protected]>
wrote:

> I am writing a pipeline that will read from kafka and convert the data
> into Avro files with a fixed windows of 10min.
>
> I am using a *DynamicAvroDestinations *in order to build a dynamic path
> and select the corresponding schema based on the incoming data.
>
> 1.)
> While testing on my machine (With DirectRunner) i am using a File
> (BoundedSource) containing hundreds of this messages and feeding my
> pipeline with this, however i have found that sometimes, the pipeline fail
> with :
> {code}Caused by: java.nio.file.FileAlreadyExistsException:
> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
> at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
> at java.nio.file.Files.copy(Files.java:1274)
> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
> at
> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
> at
> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
> {code}
>
> 2.)Trying to group all related messages into a single AVRO file
> (YYYYMMDD-HHMM),  when the pipeline doesn't fail using the DirectRunner,
> the generate AVRO files only contains 1 record per file.
>
> if i generate a pseudo-random name in the *getFilenamePolicy *everything
> works, but i ends up with hundreds of files, each of them containing 1
> record.
>
> My PipelineSteps are:
> -- ReadFromSource
> --ApplyWindows
> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
> --Create a KV from KafkaRecord  (The Key is the date as YYYYMMDDHHMM)
> --GroupbyKey (this return a KV<Long, Iterable<String>>)
> --Emit Each records of the iterable as (KV<Long, String>)
> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )
>
> Also please find below is the code for the getFileNamePolicy:
> {code}
> @Override
> public FileBasedSink.FilenamePolicy getFilenamePolicy(final
> GenericRecordDestination destination) {
>     return new FileBasedSink.FilenamePolicy() {
>         @Override
>         public ResourceId windowedFilename(final int shardNumber, final
> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final
> FileBasedSink.OutputFileHints outputFileHints) {
>
>             StringBuilder path = new StringBuilder(filesLocation);
>             if (!filesLocation.endsWith("/")) {
>                 path.append("/");
>             }
>
>             path.append(DateTimeUtil.format(destination.getTimestamp(),
> "yyyy/MM"))
>                     .append("/")
>                     .append(filesPrefix)
>
> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
> "dd"))
>
> .append("-w-").append(window.maxTimestamp().getMillis())
>                     .append("-").append(shardNumber)
>                     .append("_").append(numShards)
>                     .append(AVRO_SUFFIX);
>
>             return FileSystems.matchNewResource(path.toString(), false);
>         }
>
>         @Nullable
>         @Override
>         public ResourceId unwindowedFilename(final int shardNumber, final
> int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
>             throw new PipelineException("unwindowedFilename is not
> supported");
>         }
>     };
> }
> {code}
>
> Thanks
> --
>
> JC
>
>

-- 

JC

Reply via email to