Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread Antonio Si
Thanks Bastien. I will check it out.

Antonio.

On Thu, Feb 10, 2022 at 11:59 AM bastien dine 
wrote:

> I haven't used s3 with Flink, but according to this doc :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/filesystems/s3/
> You can setup pretty easily s3 and use it with s3://path/to/your/file with
> a write sink
> The page talk about DataStream but it should work with DataSet (
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/dataset/overview/#data-sinks
> )
>
> Maybe someone else will have more information about s3 dataset sink
>
> Regards,
>
>
> Le jeu. 10 févr. 2022 à 20:52, Antonio Si  a écrit :
>
>> Thanks Bastien. Can you point to an example of using a sink as we are
>> planning to write to S3?
>>
>> Thanks again for your help.
>>
>> Antonio.
>>
>> On Thu, Feb 10, 2022 at 11:49 AM bastien dine 
>> wrote:
>>
>>> Hello Antonio,
>>>
>>> .collect() method should be use with caution as it's collecting the
>>> DataSet (multiple partitions on multiple TM) into a List single list on JM
>>> (so in memory)
>>> Unless you have a lot of RAM, you can not use it this way and you
>>> probably should not
>>> I recommend you to use a sink to print it into a formatted file instead
>>> (like CSV one) or if it's too big, into something splittable
>>>
>>> Regards,
>>> Bastien
>>>
>>> --
>>>
>>> Bastien DINE
>>> Data Architect / Software Engineer / Sysadmin
>>> bastiendine.io
>>>
>>>
>>> Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a
>>> écrit :
>>>
>>>> Hi,
>>>>
>>>> I am using the stateful processing api to read the states from a
>>>> savepoint file.
>>>> It works fine when the state size is small, but when the state size is
>>>> larger, around 11GB, I am getting an OOM. I think it happens when it is
>>>> doing a dataSource.collect() to obtain the states. The stackTrace is copied
>>>> at the end of the message.
>>>>
>>>> Any suggestions or hints would be very helpful.
>>>>
>>>> Thanks in advance.
>>>>
>>>> Antonio.
>>>>
>>>> java.lang.OutOfMemoryError: null
>>>> at
>>>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
>>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
>>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>> at
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
>>>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>>>> at
>>>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>>> ~[?:1.8.0_282]
>>>> at
>>>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
>>>> ~[?:1.8.0_282]
>>

Re: question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread Antonio Si
Thanks Bastien. Can you point to an example of using a sink as we are
planning to write to S3?

Thanks again for your help.

Antonio.

On Thu, Feb 10, 2022 at 11:49 AM bastien dine 
wrote:

> Hello Antonio,
>
> .collect() method should be use with caution as it's collecting the
> DataSet (multiple partitions on multiple TM) into a List single list on JM
> (so in memory)
> Unless you have a lot of RAM, you can not use it this way and you probably
> should not
> I recommend you to use a sink to print it into a formatted file instead
> (like CSV one) or if it's too big, into something splittable
>
> Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le jeu. 10 févr. 2022 à 20:32, Antonio Si  a écrit :
>
>> Hi,
>>
>> I am using the stateful processing api to read the states from a
>> savepoint file.
>> It works fine when the state size is small, but when the state size is
>> larger, around 11GB, I am getting an OOM. I think it happens when it is
>> doing a dataSource.collect() to obtain the states. The stackTrace is copied
>> at the end of the message.
>>
>> Any suggestions or hints would be very helpful.
>>
>> Thanks in advance.
>>
>> Antonio.
>>
>> java.lang.OutOfMemoryError: null
>> at
>> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> ~[?:1.8.0_282]
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> ~[?:1.8.0_282]
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> ~[?:1.8.0_282]
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
>> ~[?:1.8.0_282]
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> ~[?:1.8.0_282]
>> at
>> java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> ~[?:1.8.0_282]
>> at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>> ~[?:1.8.0_282]
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
>> ~[flink-dist_2.11-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
>> ~[flink-dist_2.11-1.12.2.jar:1.12

question on dataSource.collect() on reading states from a savepoint file

2022-02-10 Thread Antonio Si
Hi,

I am using the stateful processing api to read the states from a savepoint
file.
It works fine when the state size is small, but when the state size is
larger, around 11GB, I am getting an OOM. I think it happens when it is
doing a dataSource.collect() to obtain the states. The stackTrace is copied
at the end of the message.

Any suggestions or hints would be very helpful.

Thanks in advance.

Antonio.

java.lang.OutOfMemoryError: null
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
~[?:1.8.0_282]
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
~[?:1.8.0_282]
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
~[?:1.8.0_282]
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
~[?:1.8.0_282]
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_282]
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.util.SerializedValue.(SerializedValue.java:62)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.serializeAccumulator(ExecutionGraph.java:806)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$getAccumulatorsSerialized$0(ExecutionGraph.java:795)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
~[?:1.8.0_282]
at
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
~[?:1.8.0_282]
at
java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1699)
~[?:1.8.0_282]
at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_282]
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
~[?:1.8.0_282]
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
~[?:1.8.0_282]
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
~[?:1.8.0_282]
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
~[?:1.8.0_282]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.getAccumulatorsSerialized(ExecutionGraph.java:792)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:325)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.requestJob(SchedulerBase.java:810)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.jobStatusChanged(JobMaster.java:1085)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.access$2200(JobMaster.java:131)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.jobmaster.JobMaster$JobManagerJobStatusListener.lambda$jobStatusChanges$0(JobMaster.java:1356)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
~[flink-dist_2.11-1.12.2.jar:1.12.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)