Hi Tim, thanks for the explanation and it makes more senses now as why it was failing. :)
I opened a Jira ticket https://issues.apache.org/jira/browse/BEAM-5511 for this matter. Thanks, JC On Wed, Sep 26, 2018 at 1:40 PM Tim Robertson <[email protected]> wrote: > Hi Juan > > Well done for diagnosing your issue and thank you for taking the time to > report it here. > > I'm not the author of this section but I've taken a quick look at the code > and in line comments and have some observations which I think might help > explain it. > > I notice it writes into temporary files and uses a HashMap<DestinationT, > Writer> for maintaining a pool of writers for each destination. I presume > that you are receiving a new instance of the DestinationT object on each > call and therefore the HashMap will be treating these as separate entries - > a new writer is created for each entry in the hashMap.. The method > responsible for providing the DestinationT is the following from the > FileBasedSink which does document the expectation: > > /** > * Returns an object that represents at a high level the destination being > written to. May not > * return null. A destination must have deterministic hash and equality > methods defined. > */ > public abstract DestinationT getDestination(UserT element); > > > Beyond that I notice that it also relies on using the a hashCode from the > serialised object (i.e. after running through the coder) which you note > too. The inline doc explains the reasoning for that which is because > hashCode is not guaranteed to be stable across machines. When elements are > processed on different machines we need deterministic behaviour to direct > to the correct target shard. To do that the code opts to use a murmur3_32 > algorithm which is safe across machines (Today I learnt!) and it operates > on the encoded bytes for the object which are to be deterministic. > > I agree that we should improve the documentation and state that hashCode > and equals needs to be implemented when user defined objects are used for > the dynamic destination. Would you mind opening a Jira for that please? > > I hope this helps a little, and thanks again > Tim > > > > > > > > > > > > On Wed, Sep 26, 2018 at 11:24 AM Juan Carlos Garcia <[email protected]> > wrote: > >> Hi Guys, after days of bumping my head against the monitor i found why it >> was not working. >> >> One key element when using *DynamicAvroDestinations *that is not >> described in the documentation is that, if you are using a regular POJO as >> *DestinationT* like i am (and not Long/String/Integer as the example) : >> >> {code} >> DynamicAvroDestinations<String, GenericRecordDynamicDestination, >> GenericRecord> >> {code} >> >> Its very important to pay attention to equals / hashCode >> implementations, which should aligned with your >> sharding/grouping/partition structure. Not doing so will give you the >> result i described earlier (1 file (or shard) with 1 record only, or >> sometime just an exception). >> >> While i still don't understand why it depends on equals / hashCode, as i >> checked the class on: >> *org.apache.beam.sdk.io.WriteFiles.ApplyShardingKeyFn:688* >> >> The hashing depends on the Coder itself (method: <DestinationT> int >> hashDestination(DestinationT destination, Coder<DestinationT> >> destinationCoder)). >> >> Maybe a core member could explain the reason of it, or its an unexpected >> behavior and there is a bug somewhere else. >> >> In my case below you can find my POJO Destination along with the >> corresponding Codec implementation, which works correctly as long as the >> equals / hashCode are implemented: >> >> {code} >> static class GenericRecordDynamicDestination { >> private String logicalType; >> private final int year; >> private final int month; >> private final int day; >> >> public GenericRecordDynamicDestination(final String _logicalType, >> final int _year, final int _month, final int _day) { >> logicalType = _logicalType; >> year = _year; >> month = _month; >> day = _day; >> } >> >> public String getLogicalType() { >> return logicalType; >> } >> >> public void setLogicalType(final String _logicalType) { >> logicalType = _logicalType; >> } >> >> public int getYear() { >> return year; >> } >> >> public int getMonth() { >> return month; >> } >> >> public int getDay() { >> return day; >> } >> >> @Override >> public boolean equals(final Object _o) { >> if (this == _o) return true; >> if (_o == null || getClass() != _o.getClass()) return false; >> >> final GenericRecordDynamicDestination that = >> (GenericRecordDynamicDestination) _o; >> >> if (year != that.year) return false; >> if (month != that.month) return false; >> if (day != that.day) return false; >> return logicalType.equals(that.logicalType); >> } >> >> @Override >> public int hashCode() { >> int result = logicalType.hashCode(); >> result = 31 * result + year; >> result = 31 * result + month; >> result = 31 * result + day; >> return result; >> } >> } >> >> static class GenericRecordDestinationCoder extends >> CustomCoder<GenericRecordDynamicDestination> { >> @Override >> public void encode(final GenericRecordDynamicDestination value, >> final OutputStream outStream) throws IOException { >> final ObjectOutputStream out = new >> ObjectOutputStream(outStream); >> out.writeUTF(value.getLogicalType()); >> out.writeInt(value.getYear()); >> out.writeInt(value.getMonth()); >> out.writeInt(value.getDay()); >> out.flush(); >> } >> >> @Override >> public GenericRecordDynamicDestination decode(final InputStream >> inStream) throws IOException { >> final ObjectInputStream in = new ObjectInputStream(inStream); >> String logicalType = in.readUTF(); >> int year = in.readInt(); >> int month = in.readInt(); >> int day = in.readInt(); >> return new GenericRecordDynamicDestination(logicalType, year, >> month, day); >> } >> >> @Override >> public void verifyDeterministic() throws >> NonDeterministicException { >> // >> } >> } >> {code} >> >> >> On Thu, Sep 20, 2018 at 12:54 PM Juan Carlos Garcia <[email protected]> >> wrote: >> >>> I am writing a pipeline that will read from kafka and convert the data >>> into Avro files with a fixed windows of 10min. >>> >>> I am using a *DynamicAvroDestinations *in order to build a dynamic path >>> and select the corresponding schema based on the incoming data. >>> >>> 1.) >>> While testing on my machine (With DirectRunner) i am using a File >>> (BoundedSource) containing hundreds of this messages and feeding my >>> pipeline with this, however i have found that sometimes, the pipeline fail >>> with : >>> {code}Caused by: java.nio.file.FileAlreadyExistsException: >>> /tmp/test-tracking/2018/09/tracking-day-11-w--9223372036200001-0_4.avro >>> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) >>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) >>> at sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:243) >>> at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:581) >>> at >>> sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253) >>> at java.nio.file.Files.copy(Files.java:1274) >>> at org.apache.beam.sdk.io.LocalFileSystem.copy(LocalFileSystem.java:143) >>> at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:301) >>> at >>> org.apache.beam.sdk.io.FileBasedSink$WriteOperation.moveToOutputFiles(FileBasedSink.java:756) >>> at >>> org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:801) >>> {code} >>> >>> 2.)Trying to group all related messages into a single AVRO file >>> (YYYYMMDD-HHMM), when the pipeline doesn't fail using the DirectRunner, >>> the generate AVRO files only contains 1 record per file. >>> >>> if i generate a pseudo-random name in the *getFilenamePolicy *everything >>> works, but i ends up with hundreds of files, each of them containing 1 >>> record. >>> >>> My PipelineSteps are: >>> -- ReadFromSource >>> --ApplyWindows >>> (FixedWindows.of(Duration.standardSeconds(config.getWindowDuration())))) >>> --Create a KV from KafkaRecord (The Key is the date as YYYYMMDDHHMM) >>> --GroupbyKey (this return a KV<Long, Iterable<String>>) >>> --Emit Each records of the iterable as (KV<Long, String>) >>> --AvroIO ( AvroIO.<KV<Long, String>>writeCustomTypeToGenericRecords() ) >>> >>> Also please find below is the code for the getFileNamePolicy: >>> {code} >>> @Override >>> public FileBasedSink.FilenamePolicy getFilenamePolicy(final >>> GenericRecordDestination destination) { >>> return new FileBasedSink.FilenamePolicy() { >>> @Override >>> public ResourceId windowedFilename(final int shardNumber, final >>> int numShards, final BoundedWindow window, final PaneInfo paneInfo, final >>> FileBasedSink.OutputFileHints outputFileHints) { >>> >>> StringBuilder path = new StringBuilder(filesLocation); >>> if (!filesLocation.endsWith("/")) { >>> path.append("/"); >>> } >>> >>> path.append(DateTimeUtil.format(destination.getTimestamp(), >>> "yyyy/MM")) >>> .append("/") >>> .append(filesPrefix) >>> >>> .append("-day-").append(DateTimeUtil.format(destination.getTimestamp(), >>> "dd")) >>> >>> .append("-w-").append(window.maxTimestamp().getMillis()) >>> .append("-").append(shardNumber) >>> .append("_").append(numShards) >>> .append(AVRO_SUFFIX); >>> >>> return FileSystems.matchNewResource(path.toString(), false); >>> } >>> >>> @Nullable >>> @Override >>> public ResourceId unwindowedFilename(final int shardNumber, >>> final int numShards, final FileBasedSink.OutputFileHints outputFileHints) { >>> throw new PipelineException("unwindowedFilename is not >>> supported"); >>> } >>> }; >>> } >>> {code} >>> >>> Thanks >>> -- >>> >>> JC >>> >>> >> >> -- >> >> JC >> >> -- JC
