Hello Fabian,

Thank you for your response. I tried setting the solution set to unmanaged and 
got a different error:

2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR 
org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join (join 
solution trees) (1/8)
java.lang.NullPointerException: null
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I initially thought this was due to a null being present in the solution set 
tuple so I added assertions to ensure that tuple values were never null. 
However, I’m still getting the above error. Did changing it to unmanaged cause 
the tuples to be serialized? Is there another reason aside from null values 
that this error might be thrown?

Thank you,

Joshua

On Oct 25, 2017, at 3:12 AM, Fabian Hueske 
<fhue...@gmail.com<mailto:fhue...@gmail.com>> wrote:

Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is 
managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the 
performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. 
heap memory, increasing heap memory, ...) or add more resources and increase 
the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on 
the heap by setting the solution set to unManaged 
(DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith 
<jgriff...@campuslabs.com<mailto:jgriff...@campuslabs.com>>:
I’m currently using a delta iteration within a batch job and received the 
following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 
32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 
125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at 
org.apache.flink.runtime.iterative.io<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fiterative.io&data=02%7C01%7CJGriffith%40campuslabs.com%7Cd8ec77de6d934f7200a708d51b80337a%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636445159803224851&sdata=g0iK%2BZymCRuy4fEyHJ55bvhanT%2FLe7QzoURYLBhnlos%3D&reserved=0>.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not 
spill to disk?

Thanks,

Joshua


Reply via email to