Hi Robert, Thanks for your reply.
I set the akka.ask.timeout to 10k seconds just to see what happened. I tried different values but non did the trick. My problem was solved by using a machine with more RAM. However, it was not clear that the memory was the problem :) Attached are the log and the Scala code of the transformation that I was running. The data file I am processing is around 57M lines (~1.7GB). Let me know if you have any comment or suggestion. Thanks again, Frederick On Fri, Jan 15, 2016 at 10:01 AM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Frederick, > > sorry for the delayed response. > > I have no idea what the problem could be. > Has the exception been thrown from the env.execute() call? > Why did you set the akka.ask.timeout to 10k seconds? > > > > > On Wed, Jan 13, 2016 at 2:13 AM, Frederick Ayala <frederickay...@gmail.com > > wrote: > >> Hi, >> >> I am having an error while running some Flink transformations in a >> DataStream Scala API. >> >> The error I get is: >> >> Timeout while waiting for JobManager answer. Job time exceeded 21474835 >> seconds >> ... >> >> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka://flink/user/$a#183984057]] after [21474835000 ms] >> >> >> This happens after a couple of minutes. Not after 21474835 seconds... >> >> I tried different configurations but no result so far: >> val customConfiguration = new Configuration() >> customConfiguration.setInteger("parallelism", 8) >> customConfiguration.setInteger("jobmanager.heap.mb",2560) >> customConfiguration.setInteger("taskmanager.heap.mb",10240) >> customConfiguration.setInteger("taskmanager.numberOfTaskSlots",8) >> >> customConfiguration.setInteger("taskmanager.network.numberOfBuffers",16384) >> customConfiguration.setString("akka.ask.timeout","10000 s") >> customConfiguration.setString("akka.lookup.timeout","100 s") >> env = >> ExecutionEnvironment.createLocalEnvironment(customConfiguration) >> >> Any idea what could it be the problem? >> >> Thanks! >> >> Frederick >> > > -- Frederick Ayala
val datafile = env.readCsvFile[(String, String, String)](option_datafile, fieldDelimiter=";",quoteCharacter = '|').filter(_._3.toInt >= implicit_ratings_from) val ordered_user_timestamp = datafile.sortPartition(0,Order.ASCENDING).groupBy(0).reduceGroup(new GroupReduceFunction[(String, String, String), List[CoConsumed]] { override def reduce(values: Iterable[(String, String, String)], out: Collector[List[CoConsumed]]): Unit = { val sorted_values = values.asScala.toList out.collect(sorted_values.sliding(2).filter(_.size >1 ).map(x => new CoConsumed(x.head._2, x.tail.head._2)).toList) } }).flatMap{x=>x.map{y=>y}} val support_j = ordered_user_timestamp.map{x=>(x.j,1.0)}.groupBy(0).sum(1).filter(_._2 > filter_pairs_frequencies) val grouped_items = ordered_user_timestamp.join(support_j).where(_.j).equalTo(_._1).map{x=>(x._1,1.0)}.groupBy(0).sum(1).collect().toMap[CoConsumed,Double]
netflix_100_sample_05.out
Description: Binary data