Hi Nico,

Unfortunately I can't share any of data, but it is not even data being
processed at the point of failure - it is still in the
matching-files-from-GCS phase.

I am using Apache Beam's FileIO to match files and during one of those
match-files steps I get the failure above.

Currently I run the job and when a taskmanager shows this error I reset it
and restart the job. That works fine since the failure occurs at the
beginning of the job only. It seems to be a problem within some
taskmanagers, which is very odd considering that I have them all generated
by a Kubernetes deployment, i.e. they should be completely identical.
Sometimes I have to restart 3-4 of them until I have a running cluster.

I will try setting the temporary directory to something other than the
default, can't hurt.

Thanks for the help,
Encho

On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <n...@data-artisans.com> wrote:

> Hi Encho,
> the SpillingAdaptiveSpanningRecordDeserializer that you see in your
> stack trace is executed while reading input records from another task.
> If the (serialized) records are too large (> 5MiB), it will write and
> assemble them in a spilling channel, i.e. on disk, instead of using
> memory. This will use the temporary directories specified via
> "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to
> System.getProperty("java.io.tmpdir").
> -> These paths must actually be on an ordinary file system, not in gs://
> or so.
>
> The reason you only see this sporadically may be because not all your
> records are that big. It should, however, be deterministic in that it
> should always occur for the same record. Maybe something is wrong here
> and the record length is messed up, e.g. due to a bug in the
> de/serializer or the network stack.
>
> Do you actually have a minimal working example that you can share
> (either privately with me, or here) and shows this error?
>
>
> Nico
>
> On 29/08/18 14:19, Encho Mishinev wrote:
> > Hello,
> >
> > I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One
> > of my jobs involves reading from Google Cloud Storage which uses the
> > file scheme "gs://". Everything was fine but once in a while I would get
> > an exception that the scheme is not recognised. Now I've started seeing
> > them more often. It seems to be arbitrary - the exact same job with the
> > exact same parameters may finish successfully or throw this exception
> > and fail immediately. I can't figure out why it's not deterministic.
> > Here is the full exception logged upon the job failing:
> >
> > java.lang.Exception: The data preparation for task 'GroupReduce
> (GroupReduce at Match files from GCS/Via
> MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error:
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No filesystem found for scheme gs
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
> >       at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> >       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: No
> filesystem found for scheme gs
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
> >       at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108)
> >       at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
> >       at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473)
> >       ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: No filesystem found for scheme gs
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831)
> > Caused by: java.lang.IllegalArgumentException: No filesystem found for
> scheme gs
> >       at org.apache.beam.sdk.io
> .FileSystems.getFileSystemInternal(FileSystems.java:459)
> >       at org.apache.beam.sdk.io
> .FileSystems.matchNewResource(FileSystems.java:529)
> >       at
> org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49)
> >       at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49)
> >       at
> org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30)
> >       at
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116)
> >       at
> org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88)
> >       at
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124)
> >       at
> org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60)
> >       at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
> >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
> >       at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
> >       at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
> >       at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
> >       at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
> >       at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90)
> >       at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103)
> >       at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145)
> >       at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> >       at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> >       at org.apache.flink.runtime.io
> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105)
> >       at org.apache.flink.runtime.io
> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
> >       at org.apache.flink.runtime.io
> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> >       at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066)
> >       at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827)
> >
> >
> > Any ideas why the behaviour is not deterministic regarding recognising
> file system schemes?
> >
> >
> > Thanks,
> >
> > Encho
> >
>
> --
> Nico Kruber | Software Engineer
> data Artisans
>
> Follow us @dataArtisans
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>
>

Reply via email to