Hi Encho,
the SpillingAdaptiveSpanningRecordDeserializer that you see in your
stack trace is executed while reading input records from another task.
If the (serialized) records are too large (> 5MiB), it will write and
assemble them in a spilling channel, i.e. on disk, instead of using
memory. This will use the temporary directories specified via
"io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
System.getProperty("java.io.tmpdir").
-> These paths must actually be on an ordinary file system, not in gs://
or so.

The reason you only see this sporadically may be because not all your
records are that big. It should, however, be deterministic in that it
should always occur for the same record. Maybe something is wrong here
and the record length is messed up, e.g. due to a bug in the
de/serializer or the network stack.

Do you actually have a minimal working example that you can share
(either privately with me, or here) and shows this error?


Nico

On 29/08/18 14:19, Encho Mishinev wrote:
> Hello,
> 
> I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One
> of my jobs involves reading from Google Cloud Storage which uses the
> file scheme "gs://". Everything was fine but once in a while I would get
> an exception that the scheme is not recognised. Now I've started seeing
> them more often. It seems to be arbitrary - the exact same job with the
> exact same parameters may finish successfully or throw this exception
> and fail immediately. I can't figure out why it's not deterministic.
> Here is the full exception logged upon the job failing:
> 
> java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce 
> at Match files from GCS/Via 
> MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error: 
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread' 
> terminated due to an exception: No filesystem found for scheme gs
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: No 
> filesystem found for scheme gs
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
>       at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
>       at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
>       ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
> due to an exception: No filesystem found for scheme gs
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme 
> gs
>       at 
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
>       at 
> org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
>       at 
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
>       at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
>       at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
>       at 
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
>       at 
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
>       at 
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
>       at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
>       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
>       at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
>       at 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
>       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>       at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
>       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> 
> 
> Any ideas why the behaviour is not deterministic regarding recognising file 
> system schemes?
> 
> 
> Thanks,
> 
> Encho
> 

-- 
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to