Re: question on ValueState

2021-02-10 Thread Roman Khachatryan
Right, in this case FileSystemStateBackend is the right choice.
The state size is limited by TM memory as you said.

Regards,
Roman


On Tue, Feb 9, 2021 at 8:54 AM yidan zhao  wrote:

> What I am interested in is whether I should use rocksDB to replace
> fileBackend.
> RocksDB's performance is not good, while it's state size can be very large.
> Currently, my job's state is about 10GB, and I use 10 TaskManagers in
> different machines, each 100G memory. I do not think I should use rocksDB,
> is it right?
>
> yidan zhao  于2021年2月9日周二 下午3:50写道:
>
>> I have a related question.
>> Since fileStateBackend uses heap as the state storage and the checkpoint
>> is finally stored in the filesystem, so whether the JobManager/TaskManager
>> memory will limit the state size? The state size is limited by TM's memory
>> * number of TMs? or limited by JM's memory.
>>
>>
>> Khachatryan Roman  于2021年2月8日周一 下午6:05写道:
>>
>>> Hi,
>>>
>>> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
>>> value on update.
>>> As for "value()", it may (de)serialize it and return a copy if there is
>>> an ongoing async snapshot in progress (to protect from modifications). This
>>> shouldn't happen often though.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:
>>>
>>>> Hi,
>>>>
>>>> MemoryStateBackend and FsStateBackend both hold keyed state in
>>>> HeapKeyedStateBackend [1], and the main structure to store data is
>>>> StateTable [2] which holds POJO format objects. That is to say, the object
>>>> would not be serialized when calling update().
>>>> On the other hand, RocksDB statebackend would store value with
>>>> serialized bytes.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>>>> [2]
>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>>>
>>>> Best
>>>> Yun Tang
>>>>
>>>> --
>>>> *From:* Colletta, Edward 
>>>> *Sent:* Sunday, February 7, 2021 19:53
>>>> *To:* user@flink.apache.org 
>>>> *Subject:* question on ValueState
>>>>
>>>>
>>>> Using FsStateBackend.
>>>>
>>>>
>>>>
>>>> I was under the impression that ValueState.value will serialize an
>>>> object which is stored in the local state backend, copy the serialized
>>>> object and deserializes it.  Likewise update() would do the same steps
>>>> copying the object back to local state backend.And as a consequence,
>>>> storing collections in ValueState is much less efficient than using
>>>> ListState or MapState if possible.
>>>>
>>>>
>>>>
>>>> However, I am looking at some code I wrote a while ago which made the
>>>> assumption that the value() method just returned a reference to the
>>>> object.  The code only calls update() when creating the object if value()
>>>> returns null.Yet the code works, all changes to the object stored in
>>>> state are visible the next time value() is called.   I have some sample
>>>> code below.
>>>>
>>>>
>>>>
>>>> Can someone clarify what really happens when value() is called?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>public void processElement(M in, Context ctx, Collector out)
>>>> throws Exception {
>>>>
>>>> MyWindow myWindow;
>>>>
>>>> myWindow = windowState.value();
>>>>
>>>> if (myWindow == null) {
>>>>
>>>>
>>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>>> + interval) / interval) * interval);
>>>>
>>>> myWindow = new MyWindow(0L, slide, windowSize);
>>>>
>>>> windowState.update(myWindow);
>>>>
>>>> myWindow.eq.add(0L);
>>>>
>>>> }
>>>>
>>>>
>>>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>>>> + in.value);
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> @Override
>>>>
>>>> public void onTimer(long timestamp, OnTimerContext ctx,
>>>> Collector out) throws Exception {
>>>>
>>>>
>>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>>> + interval) / interval) * interval);
>>>>
>>>> MyWindow myWindow = windowState.value();
>>>>
>>>> myWindow.slide(0L);
>>>>
>>>> out.collect(myWindow.globalAccum);
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>


Re: question on ValueState

2021-02-08 Thread yidan zhao
I have a related question.
Since fileStateBackend uses heap as the state storage and the checkpoint is
finally stored in the filesystem, so whether the JobManager/TaskManager
memory will limit the state size? The state size is limited by TM's memory
* number of TMs? or limited by JM's memory.


Khachatryan Roman  于2021年2月8日周一 下午6:05写道:

> Hi,
>
> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
> value on update.
> As for "value()", it may (de)serialize it and return a copy if there is an
> ongoing async snapshot in progress (to protect from modifications). This
> shouldn't happen often though.
>
> Regards,
> Roman
>
>
> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:
>
>> Hi,
>>
>> MemoryStateBackend and FsStateBackend both hold keyed state in
>> HeapKeyedStateBackend [1], and the main structure to store data is
>> StateTable [2] which holds POJO format objects. That is to say, the object
>> would not be serialized when calling update().
>> On the other hand, RocksDB statebackend would store value with serialized
>> bytes.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>> [2]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>
>> Best
>> Yun Tang
>>
>> --
>> *From:* Colletta, Edward 
>> *Sent:* Sunday, February 7, 2021 19:53
>> *To:* user@flink.apache.org 
>> *Subject:* question on ValueState
>>
>>
>> Using FsStateBackend.
>>
>>
>>
>> I was under the impression that ValueState.value will serialize an object
>> which is stored in the local state backend, copy the serialized object and
>> deserializes it.  Likewise update() would do the same steps copying the
>> object back to local state backend.And as a consequence, storing
>> collections in ValueState is much less efficient than using ListState or
>> MapState if possible.
>>
>>
>>
>> However, I am looking at some code I wrote a while ago which made the
>> assumption that the value() method just returned a reference to the
>> object.  The code only calls update() when creating the object if value()
>> returns null.Yet the code works, all changes to the object stored in
>> state are visible the next time value() is called.   I have some sample
>> code below.
>>
>>
>>
>> Can someone clarify what really happens when value() is called?
>>
>>
>>
>>
>>
>>public void processElement(M in, Context ctx, Collector out)
>> throws Exception {
>>
>> MyWindow myWindow;
>>
>> myWindow = windowState.value();
>>
>> if (myWindow == null) {
>>
>>
>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>> + interval) / interval) * interval);
>>
>> myWindow = new MyWindow(0L, slide, windowSize);
>>
>> windowState.update(myWindow);
>>
>> myWindow.eq.add(0L);
>>
>> }
>>
>>
>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>> + in.value);
>>
>> }
>>
>>
>>
>> @Override
>>
>> public void onTimer(long timestamp, OnTimerContext ctx,
>> Collector out) throws Exception {
>>
>>
>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>> + interval) / interval) * interval);
>>
>> MyWindow myWindow = windowState.value();
>>
>> myWindow.slide(0L);
>>
>> out.collect(myWindow.globalAccum);
>>
>> }
>>
>>
>>
>>
>>
>


Re: question on ValueState

2021-02-08 Thread yidan zhao
What I am interested in is whether I should use rocksDB to replace
fileBackend.
RocksDB's performance is not good, while it's state size can be very large.
Currently, my job's state is about 10GB, and I use 10 TaskManagers in
different machines, each 100G memory. I do not think I should use rocksDB,
is it right?

yidan zhao  于2021年2月9日周二 下午3:50写道:

> I have a related question.
> Since fileStateBackend uses heap as the state storage and the checkpoint
> is finally stored in the filesystem, so whether the JobManager/TaskManager
> memory will limit the state size? The state size is limited by TM's memory
> * number of TMs? or limited by JM's memory.
>
>
> Khachatryan Roman  于2021年2月8日周一 下午6:05写道:
>
>> Hi,
>>
>> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
>> value on update.
>> As for "value()", it may (de)serialize it and return a copy if there is
>> an ongoing async snapshot in progress (to protect from modifications). This
>> shouldn't happen often though.
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:
>>
>>> Hi,
>>>
>>> MemoryStateBackend and FsStateBackend both hold keyed state in
>>> HeapKeyedStateBackend [1], and the main structure to store data is
>>> StateTable [2] which holds POJO format objects. That is to say, the object
>>> would not be serialized when calling update().
>>> On the other hand, RocksDB statebackend would store value with
>>> serialized bytes.
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>>
>>> Best
>>> Yun Tang
>>>
>>> --
>>> *From:* Colletta, Edward 
>>> *Sent:* Sunday, February 7, 2021 19:53
>>> *To:* user@flink.apache.org 
>>> *Subject:* question on ValueState
>>>
>>>
>>> Using FsStateBackend.
>>>
>>>
>>>
>>> I was under the impression that ValueState.value will serialize an
>>> object which is stored in the local state backend, copy the serialized
>>> object and deserializes it.  Likewise update() would do the same steps
>>> copying the object back to local state backend.And as a consequence,
>>> storing collections in ValueState is much less efficient than using
>>> ListState or MapState if possible.
>>>
>>>
>>>
>>> However, I am looking at some code I wrote a while ago which made the
>>> assumption that the value() method just returned a reference to the
>>> object.  The code only calls update() when creating the object if value()
>>> returns null.Yet the code works, all changes to the object stored in
>>> state are visible the next time value() is called.   I have some sample
>>> code below.
>>>
>>>
>>>
>>> Can someone clarify what really happens when value() is called?
>>>
>>>
>>>
>>>
>>>
>>>public void processElement(M in, Context ctx, Collector out)
>>> throws Exception {
>>>
>>> MyWindow myWindow;
>>>
>>> myWindow = windowState.value();
>>>
>>> if (myWindow == null) {
>>>
>>>
>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>> + interval) / interval) * interval);
>>>
>>> myWindow = new MyWindow(0L, slide, windowSize);
>>>
>>> windowState.update(myWindow);
>>>
>>> myWindow.eq.add(0L);
>>>
>>> }
>>>
>>>
>>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>>> + in.value);
>>>
>>> }
>>>
>>>
>>>
>>> @Override
>>>
>>> public void onTimer(long timestamp, OnTimerContext ctx,
>>> Collector out) throws Exception {
>>>
>>>
>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>> + interval) / interval) * interval);
>>>
>>> MyWindow myWindow = windowState.value();
>>>
>>> myWindow.slide(0L);
>>>
>>> out.collect(myWindow.globalAccum);
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>


Re: question on ValueState

2021-02-08 Thread Khachatryan Roman
Hi,

I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the value
on update.
As for "value()", it may (de)serialize it and return a copy if there is an
ongoing async snapshot in progress (to protect from modifications). This
shouldn't happen often though.

Regards,
Roman


On Mon, Feb 8, 2021 at 3:24 AM Yun Tang  wrote:

> Hi,
>
> MemoryStateBackend and FsStateBackend both hold keyed state in
> HeapKeyedStateBackend [1], and the main structure to store data is
> StateTable [2] which holds POJO format objects. That is to say, the object
> would not be serialized when calling update().
> On the other hand, RocksDB statebackend would store value with serialized
> bytes.
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
> [2]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>
> Best
> Yun Tang
>
> --
> *From:* Colletta, Edward 
> *Sent:* Sunday, February 7, 2021 19:53
> *To:* user@flink.apache.org 
> *Subject:* question on ValueState
>
>
> Using FsStateBackend.
>
>
>
> I was under the impression that ValueState.value will serialize an object
> which is stored in the local state backend, copy the serialized object and
> deserializes it.  Likewise update() would do the same steps copying the
> object back to local state backend.And as a consequence, storing
> collections in ValueState is much less efficient than using ListState or
> MapState if possible.
>
>
>
> However, I am looking at some code I wrote a while ago which made the
> assumption that the value() method just returned a reference to the
> object.  The code only calls update() when creating the object if value()
> returns null.Yet the code works, all changes to the object stored in
> state are visible the next time value() is called.   I have some sample
> code below.
>
>
>
> Can someone clarify what really happens when value() is called?
>
>
>
>
>
>public void processElement(M in, Context ctx, Collector out)
> throws Exception {
>
> MyWindow myWindow;
>
> myWindow = windowState.value();
>
> if (myWindow == null) {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
> myWindow = new MyWindow(0L, slide, windowSize);
>
> windowState.update(myWindow);
>
> myWindow.eq.add(0L);
>
> }
>
>
> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
> + in.value);
>
> }
>
>
>
> @Override
>
> public void onTimer(long timestamp, OnTimerContext ctx,
> Collector out) throws Exception {
>
>
> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
> + interval) / interval) * interval);
>
> MyWindow myWindow = windowState.value();
>
> myWindow.slide(0L);
>
> out.collect(myWindow.globalAccum);
>
> }
>
>
>
>
>


Re: question on ValueState

2021-02-07 Thread Yun Tang
Hi,

MemoryStateBackend and FsStateBackend both hold keyed state in 
HeapKeyedStateBackend [1], and the main structure to store data is StateTable 
[2] which holds POJO format objects. That is to say, the object would not be 
serialized when calling update().
On the other hand, RocksDB statebackend would store value with serialized bytes.


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java

Best
Yun Tang


From: Colletta, Edward 
Sent: Sunday, February 7, 2021 19:53
To: user@flink.apache.org 
Subject: question on ValueState


Using FsStateBackend.



I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.



However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.



Can someone clarify what really happens when value() is called?





   public void processElement(M in, Context ctx, Collector out) throws 
Exception {

MyWindow myWindow;

myWindow = windowState.value();

if (myWindow == null) {


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

myWindow = new MyWindow(0L, slide, windowSize);

windowState.update(myWindow);

myWindow.eq.add(0L);

}


myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);

}



@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {


ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

MyWindow myWindow = windowState.value();

myWindow.slide(0L);

out.collect(myWindow.globalAccum);

}






question on ValueState

2021-02-07 Thread Colletta, Edward
Using FsStateBackend.

I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.

However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.

Can someone clarify what really happens when value() is called?


   public void processElement(M in, Context ctx, Collector out) throws 
Exception {
MyWindow myWindow;
myWindow = windowState.value();
if (myWindow == null) {

ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);
myWindow = new MyWindow(0L, slide, windowSize);
windowState.update(myWindow);
myWindow.eq.add(0L);
}

myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector 
out) throws Exception {

ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);
MyWindow myWindow = windowState.value();
myWindow.slide(0L);
out.collect(myWindow.globalAccum);
}