Hm, you was write I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues.
Thanks for help! Thank you, Konstantin Kudryavtsev On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <se...@apache.org> wrote: > Ah, that makes sense! > > The problem is not in the core runtime, it is in the delimited input > format. It probably looks for the line split character and never finds it, > so that it starts buffering a super large line (gigabytes) which leads to > the OOM exception. > > Can you check whether the line split character and the encoding are > properly defined? > > Would actually be good to define a max line length (sane default + > configurable value) that reports when lines seem to extend a maximum length > (which is usually a misconfiguration of the split character) > > Greetings, > Stephan > > > On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev < > kudryavtsev.konstan...@gmail.com> wrote: > >> 10/08/2015 16:25:48 CHAIN DataSource (at >> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) >> (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at >> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap >> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED >> java.lang.OutOfMemoryError: Java heap space >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> 10/08/2015 16:25:48 Job execution switched to status FAILING. >> 10/08/2015 16:25:48 DataSink >> (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) >> switched to CANCELED >> 10/08/2015 16:25:48 Job execution switched to status FAILED. >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> at org.apache.flink.client.program.Client.run(Client.java:413) >> at org.apache.flink.client.program.Client.run(Client.java:356) >> at org.apache.flink.client.program.Client.run(Client.java:349) >> at >> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >> at >> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >> at >> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >> at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524) >> at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35) >> at com.epam.AirSetJobExample.main(AirSetJobExample.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >> at org.apache.flink.client.program.Client.run(Client.java:315) >> at >> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.OutOfMemoryError: Java heap space >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543) >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> >> Thank you, >> Konstantin Kudryavtsev >> >> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Can you paste the exception stack trace? >>> >>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev < >>> kudryavtsev.konstan...@gmail.com> wrote: >>> >>>> It's DataSet program that performs simple filtering, crossjoin and >>>> aggregation. >>>> >>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector >>>> doesn't work at all. >>>> >>>> Currently I have 3 taskmanagers each 5k MB, but I tried different >>>> configurations and all leads to the same exception >>>> >>>> *Sent from my ZenFone >>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <se...@apache.org> wrote: >>>> >>>>> Can you give us a bit more background? What exactly is your program >>>>> doing? >>>>> >>>>> - Are you running a DataSet program, or a DataStream program? >>>>> - Is it one simple source that reads from S3, or are there multiple >>>>> sources? >>>>> - What operations do you apply on the CSV file? >>>>> - Are you using Flink's S3 connector, or the Hadoop S3 file system? >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev < >>>>> kudryavtsev.konstan...@gmail.com> wrote: >>>>> >>>>>> Hi guys, >>>>>> >>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and >>>>>> trying to process 3.8 GB CSV data from S3. I'm surprised the fact that >>>>>> Flink failed with OutOfMemory: Java Heap space >>>>>> >>>>>> I tried to find the reason: >>>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager >>>>>> 2) then build Heap histo: >>>>>> $ jmap -histo:live 19648 | head -n23 >>>>>> num #instances #bytes class name >>>>>> ---------------------------------------------- >>>>>> 1: 131018 3763501304 [B >>>>>> 2: 61022 7820352 <methodKlass> >>>>>> 3: 61022 7688456 <constMethodKlass> >>>>>> 4: 4971 5454408 <constantPoolKlass> >>>>>> 5: 4966 4582232 <instanceKlassKlass> >>>>>> 6: 4169 3003104 <constantPoolCacheKlass> >>>>>> 7: 15696 1447168 [C >>>>>> 8: 1291 638824 [Ljava.lang.Object; >>>>>> 9: 5318 506000 java.lang.Class >>>>>> >>>>>> >>>>>> Do you have any ideas what can be the reason and how it can be fixed? >>>>>> Is Flink uses out-of-heap memory? >>>>>> >>>>>> >>>>>> Thank you, >>>>>> Konstantin Kudryavtsev >>>>>> >>>>> >>>>> >>> >> >