Hi Robert,

Thanks for your reply.

I set the akka.ask.timeout to 10k seconds just to see what happened. I
tried different values but non did the trick.

My problem was solved by using a machine with more RAM. However, it was not
clear that the memory was the problem :)

Attached are the log and the Scala code of the transformation that I was
running.

The data file I am processing is around 57M lines (~1.7GB).

Let me know if you have any comment or suggestion.

Thanks again,

Frederick



On Fri, Jan 15, 2016 at 10:01 AM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi Frederick,
>
> sorry for the delayed response.
>
> I have no idea what the problem could be.
> Has the exception been thrown from the env.execute() call?
> Why did you set the akka.ask.timeout to 10k seconds?
>
>
>
>
> On Wed, Jan 13, 2016 at 2:13 AM, Frederick Ayala <frederickay...@gmail.com
> > wrote:
>
>> Hi,
>>
>> I am having an error while running some Flink transformations in a
>> DataStream Scala API.
>>
>> The error I get is:
>>
>> Timeout while waiting for JobManager answer. Job time exceeded 21474835
>> seconds
>> ...
>>
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/$a#183984057]] after [21474835000 ms]
>>
>>
>> This happens after a couple of minutes. Not after 21474835 seconds...
>>
>> I tried different configurations but no result so far:
>>       val customConfiguration = new Configuration()
>>       customConfiguration.setInteger("parallelism", 8)
>>       customConfiguration.setInteger("jobmanager.heap.mb",2560)
>>       customConfiguration.setInteger("taskmanager.heap.mb",10240)
>>       customConfiguration.setInteger("taskmanager.numberOfTaskSlots",8)
>>
>> customConfiguration.setInteger("taskmanager.network.numberOfBuffers",16384)
>>       customConfiguration.setString("akka.ask.timeout","10000 s")
>>       customConfiguration.setString("akka.lookup.timeout","100 s")
>>       env =
>> ExecutionEnvironment.createLocalEnvironment(customConfiguration)
>>
>> Any idea what could it be the problem?
>>
>> Thanks!
>>
>> Frederick
>>
>
>


-- 
Frederick Ayala
val datafile = env.readCsvFile[(String, String, String)](option_datafile, 
fieldDelimiter=";",quoteCharacter = '|').filter(_._3.toInt >= 
implicit_ratings_from)

val ordered_user_timestamp = 
datafile.sortPartition(0,Order.ASCENDING).groupBy(0).reduceGroup(new 
GroupReduceFunction[(String, String, String), List[CoConsumed]] {
  override def reduce(values: Iterable[(String, String, String)], out: 
Collector[List[CoConsumed]]): Unit = {
    val sorted_values = values.asScala.toList
    out.collect(sorted_values.sliding(2).filter(_.size >1 ).map(x => new 
CoConsumed(x.head._2, x.tail.head._2)).toList)
  }
}).flatMap{x=>x.map{y=>y}}

val support_j = 
ordered_user_timestamp.map{x=>(x.j,1.0)}.groupBy(0).sum(1).filter(_._2 > 
filter_pairs_frequencies)

val grouped_items = 
ordered_user_timestamp.join(support_j).where(_.j).equalTo(_._1).map{x=>(x._1,1.0)}.groupBy(0).sum(1).collect().toMap[CoConsumed,Double]

Attachment: netflix_100_sample_05.out
Description: Binary data

Reply via email to