@Tarandeep and Flavio: +1 to Stephan's question.

Furthermore, I've created a branch which adds a simple CRC32 checksum
calculation over the network buffer content here:
https://github.com/uce/flink/tree/checksum

It would be great if you could run your job with a build from this
branch. It's based on the current 1.1. release branch. If you need
help building and running from this branch, feel free to ping me.

git clone https://github.com/uce/flink.git flink-uce
cd flink-uce
git checkout -b checksum origin/checksum
mvn clean install -DskipTests

The build binary distro is found in
flink-dist/target/flink-1.1-SNAPSHOT-bin/flink-1.1-SNAPSHOT. Just copy
your config files to the conf dir there and start Flink as usual in
that directory.

If the checksums don't match the job should fail with an Exception. If
this happens, it is likely that the problems are caused by data
corruption on the network layer. If not, it's more likely that there
is something off with the Kryo serializers.

(You can also follow this guide depending on your Hadoop requirements:
https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html#hadoop-versions)


On Tue, Oct 4, 2016 at 7:10 PM, Stephan Ewen <se...@apache.org> wrote:
> It would be great to know if this only occurs in setups where Netty in
> involved (more than one TaskManager and, and at least one shuffle/rebalance)
> or also in one-taskmanager setups (which have local channels only).
>
> Stephan
>
> On Tue, Oct 4, 2016 at 11:49 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>> Hi Tarandeep,
>>
>> it would be great if you could compile a small example data set with which
>> you're able to reproduce your problem. We could then try to debug it. It
>> would also be interesting to know whether Flavio's bug solves your problem
>> or not.
>>
>> Cheers,
>> Till
>>
>> On Mon, Oct 3, 2016 at 10:26 PM, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>>
>>> I think you're running into the same exception I face sometimes..I've
>>> opened a jira for it [1]. Could you please try to apply that patch and see
>>> if things get better?
>>>
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-4719
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>> On 3 Oct 2016 22:09, "Tarandeep Singh" <tarand...@gmail.com> wrote:
>>>>
>>>> Now, when I ran it again (with lower task slots per machine) I got a
>>>> different error-
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> execution failed: Job execution failed.
>>>>     at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>>     at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>>     at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>>>     at
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>>     at
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>>     ....
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>     at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>     at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>     at
>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>     at
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>     at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>>     at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>>>>     at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>     at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>>>>     at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>     at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>     at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find
>>>> class: javaec40-d994-yteBuffer
>>>>     at
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>     at
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>     at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>     at
>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>     at
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>     at
>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>     at
>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>     at
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>     at
>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>>>     at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>     at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.ClassNotFoundException: javaec40-d994-yteBuffer
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>     at java.lang.Class.forName0(Native Method)
>>>>     at java.lang.Class.forName(Class.java:348)
>>>>     at
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>     ... 15 more
>>>>
>>>>
>>>> -Tarandeep
>>>>
>>>> On Mon, Oct 3, 2016 at 12:49 PM, Tarandeep Singh <tarand...@gmail.com>
>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am using flink-1.0.0 and running ETL (batch) jobs on it for quite
>>>>> some time (few months) without any problem. Starting this morning, I have
>>>>> been getting errors like these-
>>>>>
>>>>> "Received an event in channel 3 while still having data from a record.
>>>>> This indicates broken serialization logic. If you are using custom
>>>>> serialization code (Writable or Value types), check their serialization
>>>>> routines. In the case of Kryo, check the respective Kryo serializer."
>>>>>
>>>>> My datasets are in Avro format. The only thing that changed today is -
>>>>> I moved to smaller cluster. When I first ran the ETL jobs, they failed 
>>>>> with
>>>>> this error-
>>>>>
>>>>> "Insufficient number of network buffers: required 20, but only 10
>>>>> available. The total number of network buffers is currently set to 20000.
>>>>> You can increase this number by setting the configuration key
>>>>> 'taskmanager.network.numberOfBuffers'"
>>>>>
>>>>> I increased number of buffers to 30k. Also decreased number of slots
>>>>> per machine as I noticed load per machine was too high. After that, when I
>>>>> restart the jobs, I am getting the above error.
>>>>>
>>>>> Can someone please help me debug it?
>>>>>
>>>>> Thank you,
>>>>> Tarandeep
>>>>
>>>>
>>
>

Reply via email to