Side note: please consider using FileIO.write() instead, it is a much more
pleasant syntax, even though the underlying implementation is the same and
it wouldn't avoid this particular pitfall.

On Wed, Sep 26, 2018, 8:53 AM Juan Carlos Garcia <[email protected]>
wrote:

> Hi Tim, thanks for the explanation and it makes more senses now as why it
> was failing. :)
>
> I opened a Jira ticket https://issues.apache.org/jira/browse/BEAM-5511
> for this matter.
>
> Thanks,
> JC
>
>
> On Wed, Sep 26, 2018 at 1:40 PM Tim Robertson <[email protected]>
> wrote:
>
>> Hi Juan
>>
>> Well done for diagnosing your issue and thank you for taking the time to
>> report it here.
>>
>> I'm not the author of this section but I've taken a quick look at the
>> code and in line comments and have some observations which I think might
>> help explain it.
>>
>> I notice it writes into temporary files and uses a HashMap<DestinationT,
>> Writer> for maintaining a pool of writers for each destination. I presume
>> that you are receiving a new instance of the DestinationT object on each
>> call and therefore the HashMap will be treating these as separate entries -
>> a new writer is created for each entry in the hashMap..  The method
>> responsible for providing the DestinationT is the following from the
>> FileBasedSink which does document the expectation:
>>
>> /**
>>  * Returns an object that represents at a high level the destination being 
>> written to. May not
>>  * return null. A destination must have deterministic hash and equality 
>> methods defined.
>>  */
>> public abstract DestinationT getDestination(UserT element);
>>
>>
>> Beyond that I notice that it also relies on using the a hashCode from the
>> serialised object (i.e. after running through the coder) which you note
>> too. The inline doc explains the reasoning for that which is because
>> hashCode is not guaranteed to be stable across machines. When elements are
>> processed on different machines we need deterministic behaviour to direct
>> to the correct target shard. To do that the code opts to use a murmur3_32
>> algorithm which is safe across machines (Today I learnt!) and it operates
>> on the encoded bytes for the object which are to be deterministic.
>>
>> I agree that we should improve the documentation and state that hashCode
>> and equals needs to be implemented when user defined objects are used for
>> the dynamic destination. Would you mind opening a Jira for that please?
>>
>> I hope this helps a little, and thanks again
>> Tim
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia <[email protected]>
>> wrote:
>>
>>> Hi Guys, after days of bumping my head against the monitor i found why
>>> it was not working.
>>>
>>> One key element when using *DynamicAvroDestinations *that is not
>>> described in the documentation is that, if you are using a regular POJO as
>>> *DestinationT* like i am (and not Long/String/Integer as the example) :
>>>
>>> {code}
>>>    DynamicAvroDestinations<String, GenericRecordDynamicDestination,
>>> GenericRecord>
>>> {code}
>>>
>>> Its very important to pay attention to equals / hashCode
>>> implementations,  which should aligned with your
>>> sharding/grouping/partition structure. Not doing so will give you the
>>> result i described earlier (1 file (or shard) with 1 record only, or
>>> sometime just an exception).
>>>
>>> While i still don't understand why it depends on equals / hashCode, as i
>>> checked the class on:
>>>   *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688*
>>>
>>> The hashing depends on the Coder itself (method: <DestinationT> int
>>> hashDestination(DestinationT destination, Coder<DestinationT>
>>> destinationCoder)).
>>>
>>> Maybe a core member could explain the reason of it, or its an unexpected
>>> behavior and there is a bug somewhere else.
>>>
>>> In my case below you can find my POJO Destination along with the
>>> corresponding Codec implementation, which works correctly as long as the
>>> equals / hashCode are implemented:
>>>
>>> {code}
>>> static class GenericRecordDynamicDestination {
>>>         private String logicalType;
>>>         private final int year;
>>>         private final int month;
>>>         private final int day;
>>>
>>>         public GenericRecordDynamicDestination(final String
>>> _logicalType, final int _year, final int _month, final int _day) {
>>>             logicalType = _logicalType;
>>>             year = _year;
>>>             month = _month;
>>>             day = _day;
>>>         }
>>>
>>>         public String getLogicalType() {
>>>             return logicalType;
>>>         }
>>>
>>>         public void setLogicalType(final String _logicalType) {
>>>             logicalType = _logicalType;
>>>         }
>>>
>>>         public int getYear() {
>>>             return year;
>>>         }
>>>
>>>         public int getMonth() {
>>>             return month;
>>>         }
>>>
>>>         public int getDay() {
>>>             return day;
>>>         }
>>>
>>>         @Override
>>>         public boolean equals(final Object _o) {
>>>             if (this == _o) return true;
>>>             if (_o == null || getClass() != _o.getClass()) return false;
>>>
>>>             final GenericRecordDynamicDestination that =
>>> (GenericRecordDynamicDestination) _o;
>>>
>>>             if (year != that.year) return false;
>>>             if (month != that.month) return false;
>>>             if (day != that.day) return false;
>>>             return logicalType.equals(that.logicalType);
>>>         }
>>>
>>>         @Override
>>>         public int hashCode() {
>>>             int result = logicalType.hashCode();
>>>             result = 31 * result + year;
>>>             result = 31 * result + month;
>>>             result = 31 * result + day;
>>>             return result;
>>>         }
>>> }
>>>
>>> static class GenericRecordDestinationCoder extends
>>> CustomCoder<GenericRecordDynamicDestination> {
>>>         @Override
>>>         public void encode(final GenericRecordDynamicDestination value,
>>> final OutputStream outStream) throws IOException {
>>>             final ObjectOutputStream out = new
>>> ObjectOutputStream(outStream);
>>>             out.writeUTF(value.getLogicalType());
>>>             out.writeInt(value.getYear());
>>>             out.writeInt(value.getMonth());
>>>             out.writeInt(value.getDay());
>>>             out.flush();
>>>         }
>>>
>>>         @Override
>>>         public GenericRecordDynamicDestination decode(final InputStream
>>> inStream) throws IOException {
>>>             final ObjectInputStream in = new ObjectInputStream(inStream);
>>>             String logicalType = in.readUTF();
>>>             int year = in.readInt();
>>>             int month = in.readInt();
>>>             int day = in.readInt();
>>>             return new GenericRecordDynamicDestination(logicalType,
>>> year, month, day);
>>>         }
>>>
>>>         @Override
>>>         public void verifyDeterministic() throws
>>> NonDeterministicException {
>>>             //
>>>         }
>>> }
>>> {code}
>>>
>>>
>>> On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <[email protected]>
>>> wrote:
>>>
>>>> I am writing a pipeline that will read from kafka and convert the data
>>>> into Avro files with a fixed windows of 10min.
>>>>
>>>> I am using a *DynamicAvroDestinations *in order to build a dynamic
>>>> path and select the corresponding schema based on the incoming data.
>>>>
>>>> 1.)
>>>> While testing on my machine (With DirectRunner) i am using a File
>>>> (BoundedSource) containing hundreds of this messages and feeding my
>>>> pipeline with this, however i have found that sometimes, the pipeline fail
>>>> with :
>>>> {code}Caused by: java.nio.file.FileAlreadyExistsException:
>>>> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro
>>>> at
>>>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
>>>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>>>> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243)
>>>> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581)
>>>> at
>>>> sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
>>>> at java.nio.file.Files.copy(Files.java:1274)
>>>> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143)
>>>> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301)
>>>> at
>>>> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756)
>>>> at
>>>> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801)
>>>> {code}
>>>>
>>>> 2.)Trying to group all related messages into a single AVRO file
>>>> (YYYYMMDD-HHMM),  when the pipeline doesn't fail using the DirectRunner,
>>>> the generate AVRO files only contains 1 record per file.
>>>>
>>>> if i generate a pseudo-random name in the *getFilenamePolicy *everything
>>>> works, but i ends up with hundreds of files, each of them containing 1
>>>> record.
>>>>
>>>> My PipelineSteps are:
>>>> -- ReadFromSource
>>>> --ApplyWindows
>>>> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration()))))
>>>> --Create a KV from KafkaRecord  (The Key is the date as YYYYMMDDHHMM)
>>>> --GroupbyKey (this return a KV<Long, Iterable<String>>)
>>>> --Emit Each records of the iterable as (KV<Long, String>)
>>>> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() )
>>>>
>>>> Also please find below is the code for the getFileNamePolicy:
>>>> {code}
>>>> @Override
>>>> public FileBasedSink.FilenamePolicy getFilenamePolicy(final
>>>> GenericRecordDestination destination) {
>>>>     return new FileBasedSink.FilenamePolicy() {
>>>>         @Override
>>>>         public ResourceId windowedFilename(final int shardNumber, final
>>>> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final
>>>> FileBasedSink.OutputFileHints outputFileHints) {
>>>>
>>>>             StringBuilder path = new StringBuilder(filesLocation);
>>>>             if (!filesLocation.endsWith("/")) {
>>>>                 path.append("/");
>>>>             }
>>>>
>>>>             path.append(DateTimeUtil.format(destination.getTimestamp(),
>>>> "yyyy/MM"))
>>>>                     .append("/")
>>>>                     .append(filesPrefix)
>>>>
>>>> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(),
>>>> "dd"))
>>>>
>>>> .append("-w-").append(window.maxTimestamp().getMillis())
>>>>                     .append("-").append(shardNumber)
>>>>                     .append("_").append(numShards)
>>>>                     .append(AVRO_SUFFIX);
>>>>
>>>>             return FileSystems.matchNewResource(path.toString(), false);
>>>>         }
>>>>
>>>>         @Nullable
>>>>         @Override
>>>>         public ResourceId unwindowedFilename(final int shardNumber,
>>>> final int numShards, final FileBasedSink.OutputFileHints outputFileHints) {
>>>>             throw new PipelineException("unwindowedFilename is not
>>>> supported");
>>>>         }
>>>>     };
>>>> }
>>>> {code}
>>>>
>>>> Thanks
>>>> --
>>>>
>>>> JC
>>>>
>>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>
> --
>
> JC
>
>

Reply via email to