I don’t know if this is helpful, but I’d run into a similar issue (array index 
out of bounds during Kryo deserialization) due to having a different version of 
Kryo on the classpath.

— Ken

> On Apr 26, 2016, at 6:23pm, Timur Fayruzov <timur.fairu...@gmail.com> wrote:
> 
> I built master with scala 2.11 and hadoop 2.7.1, now get a different 
> exception (still serialization-related though):
> 
>               java.lang.Exception: The data preparation for task 'CHAIN 
> CoGroup (CoGroup at 
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162))
>  -> Filter (Filter at 
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))'
>  , caused an error: Error obtaining the sorted input: Thread 'SortMerger 
> Reading Thread' terminated due to an exception: Index: 97, Size: 11
>                       at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
>                       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>                       at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>                       at java.lang.Thread.run(Thread.java:745)
>               Caused by: java.lang.RuntimeException: Error obtaining the 
> sorted input: Thread 'SortMerger Reading Thread' terminated due to an 
> exception: Index: 97, Size: 11
>                       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>                       at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>                       at 
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
>                       at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>                       ... 3 more
>               Caused by: java.io.IOException: Thread 'SortMerger Reading 
> Thread' terminated due to an exception: Index: 97, Size: 11
>                       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>               Caused by: java.lang.IndexOutOfBoundsException: Index: 97, 
> Size: 11
>                       at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>                       at java.util.ArrayList.get(ArrayList.java:429)
>                       at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>                       at 
> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>                       at 
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>                       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>                       at 
> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
>                       at 
> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
>                       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
>                       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
>                       at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
>                       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>                       at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>                       at 
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>                       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
>                       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>                       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>                       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>                       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>                       at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> 
> 
> 
> On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org 
> <mailto:trohrm...@apache.org>> wrote:
> Then let's keep finger crossed that we've found the culprit :-)
> 
> On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairu...@gmail.com 
> <mailto:timur.fairu...@gmail.com>> wrote:
> Thank you Till.
> 
> I will try to run with new binaries today. As I have mentioned, the error is 
> reproducible only on a full dataset, so coming up with sample input data may 
> be problematic (not to mention that the real data can't be shared). I'll see 
> if I can replicate it, but could take a bit longer. Thank you very much for 
> your effort.
> 
> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org 
> <mailto:trohrm...@apache.org>> wrote:
> Hi Timur,
> 
> I’ve got good and not so good news. Let’s start with the not so good news. I 
> couldn’t reproduce your problem but the good news is that I found a bug in 
> the duplication logic of the OptionSerializer. I’ve already committed a patch 
> to the master to fix it.
> 
> Thus, I wanted to ask you, whether you could try out the latest master and 
> check whether your problem still persists. If that’s the case, could you send 
> me your complete code with sample input data which reproduces your problem?
> 
> Cheers,
> Till
> 
> 
> On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Could this be caused by the disabled reference tracking in our Kryo 
> serializer? From the stack trace it looks like its failing when trying to 
> deserialize the traits that are wrapped in Options.
> 
> On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> Hey Timur,
> 
> I'm sorry about this bad experience.
> 
> From what I can tell, there is nothing unusual with your code. It's
> probably an issue with Flink.
> 
> I think we have to wait a little longer to hear what others in the
> community say about this.
> 
> @Aljoscha, Till, Robert: any ideas what might cause this?
> 
> – Ufuk
> 
> 
> On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov
> <timur.fairu...@gmail.com <mailto:timur.fairu...@gmail.com>> wrote:
> > Still trying to resolve this serialization issue. I was able to hack it by
> > 'serializing' `Record` to String and then 'deserializing' it in coGroup, but
> > boy its so ugly.
> >
> > So the bug is that it can't deserialize the case class that has the
> > structure (slightly different and more detailed than I stated above):
> > ```
> > case class Record(name: Name, phone: Option[Phone], address:
> > Option[Address])
> >
> > case class Name(givenName: Option[String], middleName: Option[String],
> > familyName: Option[String], generationSuffix: Option[String] = None)
> >
> > trait Address{
> >   val city: String
> >   val state: String
> >   val country: String
> >   val latitude: Double
> >   val longitude: Double
> >   val postalCode: String
> >   val zip4: String
> >   val digest: String
> > }
> >
> >
> > case class PoBox(city: String,
> >                  state: String,
> >                  country: String,
> >                  latitude: Double,
> >                  longitude: Double,
> >                  postalCode: String,
> >                  zip4: String,
> >                  digest: String,
> >                  poBox: String
> >                 ) extends Address
> >
> > case class PostalAddress(city: String,
> >                          state: String,
> >                          country: String,
> >                          latitude: Double,
> >                          longitude: Double,
> >                          postalCode: String,
> >                          zip4: String,
> >                          digest: String,
> >                          preDir: String,
> >                          streetName: String,
> >                          streetType: String,
> >                          postDir: String,
> >                          house: String,
> >                          aptType: String,
> >                          aptNumber: String
> >                         ) extends Address
> > ```
> >
> > I would expect that serialization is one of Flink cornerstones and should be
> > well tested, so there is a high chance of me doing things wrongly, but I
> > can't really find anything unusual in my code.
> >
> > Any suggestion what to try is highly welcomed.
> >
> > Thanks,
> > Timur
> >
> >
> > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <timur.fairu...@gmail.com 
> > <mailto:timur.fairu...@gmail.com>>
> > wrote:
> >>
> >> Hello Robert,
> >>
> >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue
> >> with a cluster (that I didn't dig into), when I restarted the cluster I was
> >> able to go past it, so now I have the following exception:
> >>
> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
> >> at
> >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158))
> >> -> Filter (Filter at
> >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))'
> >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> >> Reading Thread' terminated due to an exception: Serializer consumed more
> >> bytes than the record had. This indicates broken serialization. If you are
> >> using custom serialization types (Value or Writable), check their
> >> serialization methods. If you are using a Kryo-serialized type, check the
> >> corresponding Kryo serializer.
> >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
> >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> at java.lang.Thread.run(Thread.java:745)
> >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> >> Thread 'SortMerger Reading Thread' terminated due to an exception:
> >> Serializer consumed more bytes than the record had. This indicates broken
> >> serialization. If you are using custom serialization types (Value or
> >> Writable), check their serialization methods. If you are using a
> >> Kryo-serialized type, check the corresponding Kryo serializer.
> >> at
> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> >> at
> >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> >> at
> >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
> >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> >> ... 3 more
> >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> >> terminated due to an exception: Serializer consumed more bytes than the
> >> record had. This indicates broken serialization. If you are using custom
> >> serialization types (Value or Writable), check their serialization methods.
> >> If you are using a Kryo-serialized type, check the corresponding Kryo
> >> serializer.
> >> at
> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> >> Caused by: java.io.IOException: Serializer consumed more bytes than the
> >> record had. This indicates broken serialization. If you are using custom
> >> serialization types (Value or Writable), check their serialization methods.
> >> If you are using a Kryo-serialized type, check the corresponding Kryo
> >> serializer.
> >> at
> >> org.apache.flink.runtime.io 
> >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> >> at
> >> org.apache.flink.runtime.io 
> >> <http://org.apache.flink.runtime.io/>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >> at
> >> org.apache.flink.runtime.io 
> >> <http://org.apache.flink.runtime.io/>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >> at
> >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >> at
> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> >> at
> >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
> >> at
> >> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
> >> at
> >> org.apache.flink.runtime.io 
> >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254)
> >> at
> >> org.apache.flink.runtime.io 
> >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259)
> >> at org.apache.flink.types.StringValue.readString(StringValue.java:771)
> >> at
> >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> >> at
> >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
> >> at
> >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> >> at
> >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
> >> at
> >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> >> at
> >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> >> at
> >> org.apache.flink.runtime.io 
> >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >> ... 5 more
> >>
> >> Thanks,
> >> Timur
> >>
> >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetz...@apache.org 
> >> <mailto:rmetz...@apache.org>>
> >> wrote:
> >>>
> >>> For the second exception, can you check the logs of the failing
> >>> taskmanager (10.105.200.137)?
> >>> I guess these logs some details on why the TM timed out.
> >>>
> >>>
> >>> Are you on 1.0.x or on 1.1-SNAPSHOT?
> >>> We recently changed something related to the ExecutionConfig which has
> >>> lead to Kryo issues in the past.
> >>>
> >>>
> >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov
> >>> <timur.fairu...@gmail.com <mailto:timur.fairu...@gmail.com>> wrote:
> >>>>
> >>>> Trying to use ProtobufSerializer -- program consistently fails with the
> >>>> following exception:
> >>>>
> >>>> java.lang.IllegalStateException: Update task on instance
> >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
> >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager 
> >>>> <http://flink@10.105.200.137:48990/user/taskmanager> failed due to:
> >>>> at
> >>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
> >>>> at akka.dispatch.OnFailure.internal(Future.scala:228)
> >>>> at akka.dispatch.OnFailure.internal(Future.scala:227)
> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >>>> at
> >>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> >>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
> >>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> >>>> at
> >>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>> at
> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>>> at
> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>>> at
> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> >>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501 
> >>>> <http://flink@10.105.200.137:48990/user/taskmanager#1418296501>]]
> >>>> after [10000 ms]
> >>>> at
> >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> >>>> at
> >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> >>>> at
> >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> >>>> at
> >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> >>>> at
> >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> >>>> at
> >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> >>>> at
> >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> >>>> at
> >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> >>>> at java.lang.Thread.run(Thread.java:745)
> >>>>
> >>>> I'm at my wits' end now, any suggestions are highly appreciated.
> >>>>
> >>>> Thanks,
> >>>> Timur
> >>>>
> >>>>
> >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov
> >>>> <timur.fairu...@gmail.com <mailto:timur.fairu...@gmail.com>> wrote:
> >>>>>
> >>>>> Hello,
> >>>>>
> >>>>> I'm running a Flink program that is failing with the following
> >>>>> exception:
> >>>>>
> >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
> >>>>> - Error while running the command.
> >>>>> org.apache.flink.client.program.ProgramInvocationException: The program
> >>>>> execution failed: Job execution failed.
> >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >>>>> at
> >>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >>>>> at
> >>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >>>>> at
> >>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> >>>>> at
> >>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
> >>>>> at
> >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
> >>>>> at
> >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
> >>>>> at scala.Option.foreach(Option.scala:257)
> >>>>> at
> >>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
> >>>>> at
> >>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>> at
> >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> >>>>> at
> >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>> at java.lang.reflect.Method.invoke(Method.java:606)
> >>>>> at
> >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >>>>> at
> >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>> at
> >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>> at
> >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> >>>>> execution failed.
> >>>>> at
> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
> >>>>> at
> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> >>>>> at
> >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> >>>>> at
> >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> >>>>> at
> >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> >>>>> at
> >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>>> at
> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> >>>>> at
> >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> >>>>> at
> >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>>>> at
> >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN
> >>>>> CoGroup (CoGroup at
> >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
> >>>>> Filter (Filter at
> >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
> >>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger
> >>>>> Reading Thread' terminated due to an exception: No more bytes left.
> >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>>>> at java.lang.Thread.run(Thread.java:745)
> >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
> >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an 
> >>>>> exception: No
> >>>>> more bytes left.
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
> >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> >>>>> ... 3 more
> >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> >>>>> terminated due to an exception: No more bytes left.
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> >>>>> Caused by: java.io.EOFException: No more bytes left.
> >>>>> at
> >>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
> >>>>> at com.esotericsoftware.kryo.io 
> >>>>> <http://com.esotericsoftware.kryo.io/>.Input.readUtf8_slow(Input.java:542)
> >>>>> at com.esotericsoftware.kryo.io 
> >>>>> <http://com.esotericsoftware.kryo.io/>.Input.readUtf8(Input.java:535)
> >>>>> at com.esotericsoftware.kryo.io 
> >>>>> <http://com.esotericsoftware.kryo.io/>.Input.readString(Input.java:465)
> >>>>> at
> >>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198)
> >>>>> at
> >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764)
> >>>>> at
> >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> >>>>> at
> >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67)
> >>>>> at
> >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
> >>>>> at
> >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
> >>>>> at
> >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
> >>>>> at
> >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
> >>>>> at
> >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
> >>>>> at
> >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> >>>>> at
> >>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> >>>>> at
> >>>>> org.apache.flink.runtime.io 
> >>>>> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
> >>>>> at
> >>>>> org.apache.flink.runtime.io 
> >>>>> <http://org.apache.flink.runtime.io/>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> >>>>> at
> >>>>> org.apache.flink.runtime.io 
> >>>>> <http://org.apache.flink.runtime.io/>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> >>>>> at
> >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> >>>>>
> >>>>> The simplified version of the code looks more or less like following:
> >>>>> ```
> >>>>> case class Name(first: String, last: String)
> >>>>> case class Phone(number: String)
> >>>>> case class Address(addr: String, city: String, country: String)
> >>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address])
> >>>>> ...
> >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] =>
> >>>>> String = ...
> >>>>> ...
> >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_))
> >>>>>
> >>>>> val helper: DataSet[(Name, String)] = ...
> >>>>>
> >>>>> val result = data.filter(_.address.isDefined)
> >>>>>   .coGroup(helper)
> >>>>>   .where(e => LegacyDigest.buildMessageDigest((e.name <http://e.name/>,
> >>>>> e.address.get.country)))
> >>>>>   .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2)))
> >>>>>   .apply {resolutionFunc}
> >>>>>   .filter(_ != "")
> >>>>>
> >>>>> result.writeAsText(...)
> >>>>> ```
> >>>>>
> >>>>> This code fails only when I run it on the full dataset, when I split
> >>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able 
> >>>>> to
> >>>>> complete successfully. I guess with smaller memory requirements
> >>>>> serialization/deserialization does not kick in.
> >>>>>
> >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo:
> >>>>> ```
> >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record],
> >>>>> classOf[ProtobufSerializer])
> >>>>>
> >>>>> ```
> >>>>> but every run takes significant time before failing, so any other
> >>>>> advice is appreciated.
> >>>>>
> >>>>> Thanks,
> >>>>> Timur
> >>>>
> >>>>
> >>>
> >>
> >
> 
> 
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Reply via email to