Side note: please consider using FileIO.write() instead, it is a much more pleasant syntax, even though the underlying implementation is the same and it wouldn't avoid this particular pitfall.
On Wed, Sep 26, 2018, 8:53 AM Juan Carlos Garcia <[email protected]> wrote: > Hi Tim, thanks for the explanation and it makes more senses now as why it > was failing. :) > > I opened a Jira ticket https://issues.apache.org/jira/browse/BEAM-5511 > for this matter. > > Thanks, > JC > > > On Wed, Sep 26, 2018 at 1:40 PM Tim Robertson <[email protected]> > wrote: > >> Hi Juan >> >> Well done for diagnosing your issue and thank you for taking the time to >> report it here. >> >> I'm not the author of this section but I've taken a quick look at the >> code and in line comments and have some observations which I think might >> help explain it. >> >> I notice it writes into temporary files and uses a HashMap<DestinationT, >> Writer> for maintaining a pool of writers for each destination. I presume >> that you are receiving a new instance of the DestinationT object on each >> call and therefore the HashMap will be treating these as separate entries - >> a new writer is created for each entry in the hashMap.. The method >> responsible for providing the DestinationT is the following from the >> FileBasedSink which does document the expectation: >> >> /** >> * Returns an object that represents at a high level the destination being >> written to. May not >> * return null. A destination must have deterministic hash and equality >> methods defined. >> */ >> public abstract DestinationT getDestination(UserT element); >> >> >> Beyond that I notice that it also relies on using the a hashCode from the >> serialised object (i.e. after running through the coder) which you note >> too. The inline doc explains the reasoning for that which is because >> hashCode is not guaranteed to be stable across machines. When elements are >> processed on different machines we need deterministic behaviour to direct >> to the correct target shard. To do that the code opts to use a murmur3_32 >> algorithm which is safe across machines (Today I learnt!) and it operates >> on the encoded bytes for the object which are to be deterministic. >> >> I agree that we should improve the documentation and state that hashCode >> and equals needs to be implemented when user defined objects are used for >> the dynamic destination. Would you mind opening a Jira for that please? >> >> I hope this helps a little, and thanks again >> Tim >> >> >> >> >> >> >> >> >> >> >> >> On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia <[email protected]> >> wrote: >> >>> Hi Guys, after days of bumping my head against the monitor i found why >>> it was not working. >>> >>> One key element when using *DynamicAvroDestinations *that is not >>> described in the documentation is that, if you are using a regular POJO as >>> *DestinationT* like i am (and not Long/String/Integer as the example) : >>> >>> {code} >>> DynamicAvroDestinations<String, GenericRecordDynamicDestination, >>> GenericRecord> >>> {code} >>> >>> Its very important to pay attention to equals / hashCode >>> implementations, which should aligned with your >>> sharding/grouping/partition structure. Not doing so will give you the >>> result i described earlier (1 file (or shard) with 1 record only, or >>> sometime just an exception). >>> >>> While i still don't understand why it depends on equals / hashCode, as i >>> checked the class on: >>> *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688* >>> >>> The hashing depends on the Coder itself (method: <DestinationT> int >>> hashDestination(DestinationT destination, Coder<DestinationT> >>> destinationCoder)). >>> >>> Maybe a core member could explain the reason of it, or its an unexpected >>> behavior and there is a bug somewhere else. >>> >>> In my case below you can find my POJO Destination along with the >>> corresponding Codec implementation, which works correctly as long as the >>> equals / hashCode are implemented: >>> >>> {code} >>> static class GenericRecordDynamicDestination { >>> private String logicalType; >>> private final int year; >>> private final int month; >>> private final int day; >>> >>> public GenericRecordDynamicDestination(final String >>> _logicalType, final int _year, final int _month, final int _day) { >>> logicalType = _logicalType; >>> year = _year; >>> month = _month; >>> day = _day; >>> } >>> >>> public String getLogicalType() { >>> return logicalType; >>> } >>> >>> public void setLogicalType(final String _logicalType) { >>> logicalType = _logicalType; >>> } >>> >>> public int getYear() { >>> return year; >>> } >>> >>> public int getMonth() { >>> return month; >>> } >>> >>> public int getDay() { >>> return day; >>> } >>> >>> @Override >>> public boolean equals(final Object _o) { >>> if (this == _o) return true; >>> if (_o == null || getClass() != _o.getClass()) return false; >>> >>> final GenericRecordDynamicDestination that = >>> (GenericRecordDynamicDestination) _o; >>> >>> if (year != that.year) return false; >>> if (month != that.month) return false; >>> if (day != that.day) return false; >>> return logicalType.equals(that.logicalType); >>> } >>> >>> @Override >>> public int hashCode() { >>> int result = logicalType.hashCode(); >>> result = 31 * result + year; >>> result = 31 * result + month; >>> result = 31 * result + day; >>> return result; >>> } >>> } >>> >>> static class GenericRecordDestinationCoder extends >>> CustomCoder<GenericRecordDynamicDestination> { >>> @Override >>> public void encode(final GenericRecordDynamicDestination value, >>> final OutputStream outStream) throws IOException { >>> final ObjectOutputStream out = new >>> ObjectOutputStream(outStream); >>> out.writeUTF(value.getLogicalType()); >>> out.writeInt(value.getYear()); >>> out.writeInt(value.getMonth()); >>> out.writeInt(value.getDay()); >>> out.flush(); >>> } >>> >>> @Override >>> public GenericRecordDynamicDestination decode(final InputStream >>> inStream) throws IOException { >>> final ObjectInputStream in = new ObjectInputStream(inStream); >>> String logicalType = in.readUTF(); >>> int year = in.readInt(); >>> int month = in.readInt(); >>> int day = in.readInt(); >>> return new GenericRecordDynamicDestination(logicalType, >>> year, month, day); >>> } >>> >>> @Override >>> public void verifyDeterministic() throws >>> NonDeterministicException { >>> // >>> } >>> } >>> {code} >>> >>> >>> On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <[email protected]> >>> wrote: >>> >>>> I am writing a pipeline that will read from kafka and convert the data >>>> into Avro files with a fixed windows of 10min. >>>> >>>> I am using a *DynamicAvroDestinations *in order to build a dynamic >>>> path and select the corresponding schema based on the incoming data. >>>> >>>> 1.) >>>> While testing on my machine (With DirectRunner) i am using a File >>>> (BoundedSource) containing hundreds of this messages and feeding my >>>> pipeline with this, however i have found that sometimes, the pipeline fail >>>> with : >>>> {code}Caused by: java.nio.file.FileAlreadyExistsException: >>>> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro >>>> at >>>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) >>>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >>>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) >>>> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243) >>>> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581) >>>> at >>>> sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) >>>> at java.nio.file.Files.copy(Files.java:1274) >>>> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143) >>>> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301) >>>> at >>>> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756) >>>> at >>>> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801) >>>> {code} >>>> >>>> 2.)Trying to group all related messages into a single AVRO file >>>> (YYYYMMDD-HHMM), when the pipeline doesn't fail using the DirectRunner, >>>> the generate AVRO files only contains 1 record per file. >>>> >>>> if i generate a pseudo-random name in the *getFilenamePolicy *everything >>>> works, but i ends up with hundreds of files, each of them containing 1 >>>> record. >>>> >>>> My PipelineSteps are: >>>> -- ReadFromSource >>>> --ApplyWindows >>>> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration())))) >>>> --Create a KV from KafkaRecord (The Key is the date as YYYYMMDDHHMM) >>>> --GroupbyKey (this return a KV<Long, Iterable<String>>) >>>> --Emit Each records of the iterable as (KV<Long, String>) >>>> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() ) >>>> >>>> Also please find below is the code for the getFileNamePolicy: >>>> {code} >>>> @Override >>>> public FileBasedSink.FilenamePolicy getFilenamePolicy(final >>>> GenericRecordDestination destination) { >>>> return new FileBasedSink.FilenamePolicy() { >>>> @Override >>>> public ResourceId windowedFilename(final int shardNumber, final >>>> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final >>>> FileBasedSink.OutputFileHints outputFileHints) { >>>> >>>> StringBuilder path = new StringBuilder(filesLocation); >>>> if (!filesLocation.endsWith("/")) { >>>> path.append("/"); >>>> } >>>> >>>> path.append(DateTimeUtil.format(destination.getTimestamp(), >>>> "yyyy/MM")) >>>> .append("/") >>>> .append(filesPrefix) >>>> >>>> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(), >>>> "dd")) >>>> >>>> .append("-w-").append(window.maxTimestamp().getMillis()) >>>> .append("-").append(shardNumber) >>>> .append("_").append(numShards) >>>> .append(AVRO_SUFFIX); >>>> >>>> return FileSystems.matchNewResource(path.toString(), false); >>>> } >>>> >>>> @Nullable >>>> @Override >>>> public ResourceId unwindowedFilename(final int shardNumber, >>>> final int numShards, final FileBasedSink.OutputFileHints outputFileHints) { >>>> throw new PipelineException("unwindowedFilename is not >>>> supported"); >>>> } >>>> }; >>>> } >>>> {code} >>>> >>>> Thanks >>>> -- >>>> >>>> JC >>>> >>>> >>> >>> -- >>> >>> JC >>> >>> > > -- > > JC > >
