I don’t know if this is helpful, but I’d run into a similar issue (array index out of bounds during Kryo deserialization) due to having a different version of Kryo on the classpath.
— Ken > On Apr 26, 2016, at 6:23pm, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > > I built master with scala 2.11 and hadoop 2.7.1, now get a different > exception (still serialization-related though): > > java.lang.Exception: The data preparation for task 'CHAIN > CoGroup (CoGroup at > com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) > -> Filter (Filter at > com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > Reading Thread' terminated due to an exception: Index: 97, Size: 11 > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the > sorted input: Thread 'SortMerger Reading Thread' terminated due to an > exception: Index: 97, Size: 11 > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > at > org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading > Thread' terminated due to an exception: Index: 97, Size: 11 > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) > Caused by: java.lang.IndexOutOfBoundsException: Index: 97, > Size: 11 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at > com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) > at > org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75) > at > org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > > > > On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org > <mailto:trohrm...@apache.org>> wrote: > Then let's keep finger crossed that we've found the culprit :-) > > On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairu...@gmail.com > <mailto:timur.fairu...@gmail.com>> wrote: > Thank you Till. > > I will try to run with new binaries today. As I have mentioned, the error is > reproducible only on a full dataset, so coming up with sample input data may > be problematic (not to mention that the real data can't be shared). I'll see > if I can replicate it, but could take a bit longer. Thank you very much for > your effort. > > On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org > <mailto:trohrm...@apache.org>> wrote: > Hi Timur, > > I’ve got good and not so good news. Let’s start with the not so good news. I > couldn’t reproduce your problem but the good news is that I found a bug in > the duplication logic of the OptionSerializer. I’ve already committed a patch > to the master to fix it. > > Thus, I wanted to ask you, whether you could try out the latest master and > check whether your problem still persists. If that’s the case, could you send > me your complete code with sample input data which reproduces your problem? > > Cheers, > Till > > > On Tue, Apr 26, 2016 at 10:55 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Could this be caused by the disabled reference tracking in our Kryo > serializer? From the stack trace it looks like its failing when trying to > deserialize the traits that are wrapped in Options. > > On Tue, 26 Apr 2016 at 10:09 Ufuk Celebi <u...@apache.org > <mailto:u...@apache.org>> wrote: > Hey Timur, > > I'm sorry about this bad experience. > > From what I can tell, there is nothing unusual with your code. It's > probably an issue with Flink. > > I think we have to wait a little longer to hear what others in the > community say about this. > > @Aljoscha, Till, Robert: any ideas what might cause this? > > – Ufuk > > > On Mon, Apr 25, 2016 at 6:50 PM, Timur Fayruzov > <timur.fairu...@gmail.com <mailto:timur.fairu...@gmail.com>> wrote: > > Still trying to resolve this serialization issue. I was able to hack it by > > 'serializing' `Record` to String and then 'deserializing' it in coGroup, but > > boy its so ugly. > > > > So the bug is that it can't deserialize the case class that has the > > structure (slightly different and more detailed than I stated above): > > ``` > > case class Record(name: Name, phone: Option[Phone], address: > > Option[Address]) > > > > case class Name(givenName: Option[String], middleName: Option[String], > > familyName: Option[String], generationSuffix: Option[String] = None) > > > > trait Address{ > > val city: String > > val state: String > > val country: String > > val latitude: Double > > val longitude: Double > > val postalCode: String > > val zip4: String > > val digest: String > > } > > > > > > case class PoBox(city: String, > > state: String, > > country: String, > > latitude: Double, > > longitude: Double, > > postalCode: String, > > zip4: String, > > digest: String, > > poBox: String > > ) extends Address > > > > case class PostalAddress(city: String, > > state: String, > > country: String, > > latitude: Double, > > longitude: Double, > > postalCode: String, > > zip4: String, > > digest: String, > > preDir: String, > > streetName: String, > > streetType: String, > > postDir: String, > > house: String, > > aptType: String, > > aptNumber: String > > ) extends Address > > ``` > > > > I would expect that serialization is one of Flink cornerstones and should be > > well tested, so there is a high chance of me doing things wrongly, but I > > can't really find anything unusual in my code. > > > > Any suggestion what to try is highly welcomed. > > > > Thanks, > > Timur > > > > > > On Sun, Apr 24, 2016 at 3:26 AM, Timur Fayruzov <timur.fairu...@gmail.com > > <mailto:timur.fairu...@gmail.com>> > > wrote: > >> > >> Hello Robert, > >> > >> I'm on 1.0.0 compiled with Scala 2.11. The second exception was an issue > >> with a cluster (that I didn't dig into), when I restarted the cluster I was > >> able to go past it, so now I have the following exception: > >> > >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup > >> at > >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:158)) > >> -> Filter (Filter at > >> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:159))' > >> , caused an error: Error obtaining the sorted input: Thread 'SortMerger > >> Reading Thread' terminated due to an exception: Serializer consumed more > >> bytes than the record had. This indicates broken serialization. If you are > >> using custom serialization types (Value or Writable), check their > >> serialization methods. If you are using a Kryo-serialized type, check the > >> corresponding Kryo serializer. > >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) > >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > >> Thread 'SortMerger Reading Thread' terminated due to an exception: > >> Serializer consumed more bytes than the record had. This indicates broken > >> serialization. If you are using custom serialization types (Value or > >> Writable), check their serialization methods. If you are using a > >> Kryo-serialized type, check the corresponding Kryo serializer. > >> at > >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > >> at > >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > >> at > >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) > >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > >> ... 3 more > >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > >> terminated due to an exception: Serializer consumed more bytes than the > >> record had. This indicates broken serialization. If you are using custom > >> serialization types (Value or Writable), check their serialization methods. > >> If you are using a Kryo-serialized type, check the corresponding Kryo > >> serializer. > >> at > >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) > >> Caused by: java.io.IOException: Serializer consumed more bytes than the > >> record had. This indicates broken serialization. If you are using custom > >> serialization types (Value or Writable), check their serialization methods. > >> If you are using a Kryo-serialized type, check the corresponding Kryo > >> serializer. > >> at > >> org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) > >> at > >> org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io/>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > >> at > >> org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io/>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > >> at > >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > >> at > >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) > >> at > >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > >> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 > >> at > >> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) > >> at > >> org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:254) > >> at > >> org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:259) > >> at org.apache.flink.types.StringValue.readString(StringValue.java:771) > >> at > >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > >> at > >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) > >> at > >> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > >> at > >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) > >> at > >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > >> at > >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > >> at > >> org.apache.flink.runtime.io > >> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > >> ... 5 more > >> > >> Thanks, > >> Timur > >> > >> On Sun, Apr 24, 2016 at 2:45 AM, Robert Metzger <rmetz...@apache.org > >> <mailto:rmetz...@apache.org>> > >> wrote: > >>> > >>> For the second exception, can you check the logs of the failing > >>> taskmanager (10.105.200.137)? > >>> I guess these logs some details on why the TM timed out. > >>> > >>> > >>> Are you on 1.0.x or on 1.1-SNAPSHOT? > >>> We recently changed something related to the ExecutionConfig which has > >>> lead to Kryo issues in the past. > >>> > >>> > >>> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov > >>> <timur.fairu...@gmail.com <mailto:timur.fairu...@gmail.com>> wrote: > >>>> > >>>> Trying to use ProtobufSerializer -- program consistently fails with the > >>>> following exception: > >>>> > >>>> java.lang.IllegalStateException: Update task on instance > >>>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: > >>>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager > >>>> <http://flink@10.105.200.137:48990/user/taskmanager> failed due to: > >>>> at > >>>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) > >>>> at akka.dispatch.OnFailure.internal(Future.scala:228) > >>>> at akka.dispatch.OnFailure.internal(Future.scala:227) > >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) > >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) > >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > >>>> at > >>>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) > >>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) > >>>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) > >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > >>>> at > >>>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) > >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on > >>>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501 > >>>> <http://flink@10.105.200.137:48990/user/taskmanager#1418296501>]] > >>>> after [10000 ms] > >>>> at > >>>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) > >>>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > >>>> at > >>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) > >>>> at > >>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > >>>> at > >>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) > >>>> at > >>>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) > >>>> at > >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) > >>>> at > >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) > >>>> at > >>>> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> > >>>> I'm at my wits' end now, any suggestions are highly appreciated. > >>>> > >>>> Thanks, > >>>> Timur > >>>> > >>>> > >>>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov > >>>> <timur.fairu...@gmail.com <mailto:timur.fairu...@gmail.com>> wrote: > >>>>> > >>>>> Hello, > >>>>> > >>>>> I'm running a Flink program that is failing with the following > >>>>> exception: > >>>>> > >>>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend > >>>>> - Error while running the command. > >>>>> org.apache.flink.client.program.ProgramInvocationException: The program > >>>>> execution failed: Job execution failed. > >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > >>>>> at > >>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > >>>>> at > >>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) > >>>>> at > >>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) > >>>>> at > >>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) > >>>>> at > >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) > >>>>> at > >>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) > >>>>> at scala.Option.foreach(Option.scala:257) > >>>>> at > >>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) > >>>>> at > >>>>> com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) > >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>> at > >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > >>>>> at > >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>> at java.lang.reflect.Method.invoke(Method.java:606) > >>>>> at > >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > >>>>> at > >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>>> at > >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>>> at > >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > >>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > >>>>> execution failed. > >>>>> at > >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) > >>>>> at > >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > >>>>> at > >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > >>>>> at > >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > >>>>> at > >>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > >>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > >>>>> at > >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > >>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>>>> at > >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > >>>>> at > >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > >>>>> at > >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >>>>> at > >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>>> Caused by: java.lang.Exception: The data preparation for task 'CHAIN > >>>>> CoGroup (CoGroup at > >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> > >>>>> Filter (Filter at > >>>>> com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , > >>>>> caused an error: Error obtaining the sorted input: Thread 'SortMerger > >>>>> Reading Thread' terminated due to an exception: No more bytes left. > >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) > >>>>> at > >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted > >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an > >>>>> exception: No > >>>>> more bytes left. > >>>>> at > >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > >>>>> at > >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > >>>>> at > >>>>> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) > >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > >>>>> ... 3 more > >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > >>>>> terminated due to an exception: No more bytes left. > >>>>> at > >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) > >>>>> Caused by: java.io.EOFException: No more bytes left. > >>>>> at > >>>>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) > >>>>> at com.esotericsoftware.kryo.io > >>>>> <http://com.esotericsoftware.kryo.io/>.Input.readUtf8_slow(Input.java:542) > >>>>> at com.esotericsoftware.kryo.io > >>>>> <http://com.esotericsoftware.kryo.io/>.Input.readUtf8(Input.java:535) > >>>>> at com.esotericsoftware.kryo.io > >>>>> <http://com.esotericsoftware.kryo.io/>.Input.readString(Input.java:465) > >>>>> at > >>>>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeStringField.read(UnsafeCacheFields.java:198) > >>>>> at > >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:764) > >>>>> at > >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > >>>>> at > >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:67) > >>>>> at > >>>>> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) > >>>>> at > >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) > >>>>> at > >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) > >>>>> at > >>>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) > >>>>> at > >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) > >>>>> at > >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > >>>>> at > >>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > >>>>> at > >>>>> org.apache.flink.runtime.io > >>>>> <http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) > >>>>> at > >>>>> org.apache.flink.runtime.io > >>>>> <http://org.apache.flink.runtime.io/>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > >>>>> at > >>>>> org.apache.flink.runtime.io > >>>>> <http://org.apache.flink.runtime.io/>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > >>>>> at > >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > >>>>> at > >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) > >>>>> at > >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > >>>>> > >>>>> The simplified version of the code looks more or less like following: > >>>>> ``` > >>>>> case class Name(first: String, last: String) > >>>>> case class Phone(number: String) > >>>>> case class Address(addr: String, city: String, country: String) > >>>>> case class Record(n: Name, phone: Option[Phone], addr: Option[Address]) > >>>>> ... > >>>>> def resolutionFunc: (Iterator[Record], Iterator[(Name, String)] => > >>>>> String = ... > >>>>> ... > >>>>> val data = env.readCsvFile[MySchema](...).map(Record(_)) > >>>>> > >>>>> val helper: DataSet[(Name, String)] = ... > >>>>> > >>>>> val result = data.filter(_.address.isDefined) > >>>>> .coGroup(helper) > >>>>> .where(e => LegacyDigest.buildMessageDigest((e.name <http://e.name/>, > >>>>> e.address.get.country))) > >>>>> .equalTo(e => LegacyDigest.buildMessageDigest((e._1, e._2))) > >>>>> .apply {resolutionFunc} > >>>>> .filter(_ != "") > >>>>> > >>>>> result.writeAsText(...) > >>>>> ``` > >>>>> > >>>>> This code fails only when I run it on the full dataset, when I split > >>>>> the `data` on smaller chunks (`helper` always stays the same), I'm able > >>>>> to > >>>>> complete successfully. I guess with smaller memory requirements > >>>>> serialization/deserialization does not kick in. > >>>>> > >>>>> I'm trying now to explicitly set Protobuf serializer for Kryo: > >>>>> ``` > >>>>> env.getConfig.registerTypeWithKryoSerializer(classOf[Record], > >>>>> classOf[ProtobufSerializer]) > >>>>> > >>>>> ``` > >>>>> but every run takes significant time before failing, so any other > >>>>> advice is appreciated. > >>>>> > >>>>> Thanks, > >>>>> Timur > >>>> > >>>> > >>> > >> > > > > > > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr