Hi Encho, the SpillingAdaptiveSpanningRecordDeserializer that you see in your stack trace is executed while reading input records from another task. If the (serialized) records are too large (> 5MiB), it will write and assemble them in a spilling channel, i.e. on disk, instead of using memory. This will use the temporary directories specified via "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to System.getProperty("java.io.tmpdir"). -> These paths must actually be on an ordinary file system, not in gs:// or so.
The reason you only see this sporadically may be because not all your records are that big. It should, however, be deterministic in that it should always occur for the same record. Maybe something is wrong here and the record length is messed up, e.g. due to a bug in the de/serializer or the network stack. Do you actually have a minimal working example that you can share (either privately with me, or here) and shows this error? Nico On 29/08/18 14:19, Encho Mishinev wrote: > Hello, > > I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One > of my jobs involves reading from Google Cloud Storage which uses the > file scheme "gs://". Everything was fine but once in a while I would get > an exception that the scheme is not recognised. Now I've started seeing > them more often. It seems to be arbitrary - the exact same job with the > exact same parameters may finish successfully or throw this exception > and fail immediately. I can't figure out why it's not deterministic. > Here is the full exception logged upon the job failing: > > java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce > at Match files from GCS/Via > MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error: > Error obtaining the sorted input: Thread 'SortMerger Reading Thread' > terminated due to an exception: No filesystem found for scheme gs > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: No > filesystem found for scheme gs > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated > due to an exception: No filesystem found for scheme gs > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) > Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme > gs > at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459) > at > org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529) > at > org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) > at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49) > at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30) > at > org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) > at > org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) > at > org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) > at > org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) > > > Any ideas why the behaviour is not deterministic regarding recognising file > system schemes? > > > Thanks, > > Encho > -- Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
signature.asc
Description: OpenPGP digital signature