Re: State Maintenance

2017-09-08 Thread Fabian Hueske
Only KeyedState can be used as queryable state. So you cannot query the
OperatorState.
AFAIK, it should not be a problem if an operator has OperatorState and
queryable KeyedState.

2017-09-07 17:01 GMT+02:00 Navneeth Krishnan :

> Will I be able to use both queryable MapState and union list state while
> implementing the CheckpointedFunction interface? Because one of my major
> requirement on that operator is to provide a queryable state and in order
> to compute that state we need the common static state across all parallel
> operator instances.
>
> Thanks.
>
> On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske  wrote:
>
>> Hi Navneeth,
>>
>> there's a lower level state interface that should address your
>> requirements: OperatorStateStore.getUnionListState()
>>
>> This union list state is similar to the regular operator list state, but
>> instead of splitting the list for recovery and giving out splits to
>> operator instance, it restores the complete list on each operator instance.
>> So it basically does a broadcast restore. If all operator have the same
>> state, only one instance checkpoints its state and this state is restored
>> to all other instances in case of a failure. This should also work with
>> rescaling.
>> The operator instance to checkpoint can be identified by
>> (RuntimeContext.getIndexOfThisSubtask == 0).
>>
>> The OperatorStateStore is a bit hidden. You have to implement the
>> CheckpointedFunction interface. When CheckpointedFunction.initializ
>> eState(FunctionInitializationContext context) is called context has a
>> method getOperatorStateStore().
>>
>> I'd recommend to have a look at the detailed JavaDocs of all involved
>> classes and methods.
>>
>> Hope this helps,
>> Fabian
>>
>>
>> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan :
>>
>>> Thanks Gordon for your response. I have around 80 parallel flatmap
>>> operator instances and each instance requires 3 states. Out of which one is
>>> user state in which each operator will have unique user's data and I need
>>> this data to be queryable. The other two states are kind of static states
>>> which are only modified when there an update message in config stream. This
>>> static data could easily be around 2GB and in my previous approach I used
>>> operator state where the data is retrieved inside open method across all
>>> operator instances whereas checkpointed only inside one of the operator
>>> instance.
>>>
>>> One of the issue that I have is if I change the operator parallelism how
>>> would it affect the internal state?
>>>
>>>
>>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>>
 Hi Navneeth,

 Answering your three questions separately:

 1. Yes. Your MapState will be backed by RocksDB, so when removing an
 entry
 from the map state, the state will be removed from the local RocksDB as
 well.

 2. If state classes are not POJOs, they will be serialized by Kryo,
 unless a
 custom serializer is specifically specified otherwise. You can take a
 look
 at this document on how to do that [1].

 3. I might need to know more information to be able to suggest properly
 for
 this one. How are you using the "huge state values"? From what you
 described, it seems like you only need it on one of the parallel
 instances,
 so I'm a bit curious on what they are actually used for. Are they needed
 when processing your records?

 Cheers,
 Gordon

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/stream/state.html#custom-serialization-for-managed-state



 --
 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
 ble.com/

>>>
>>>
>>
>


Re: State Maintenance

2017-09-07 Thread Navneeth Krishnan
Will I be able to use both queryable MapState and union list state while
implementing the CheckpointedFunction interface? Because one of my major
requirement on that operator is to provide a queryable state and in order
to compute that state we need the common static state across all parallel
operator instances.

Thanks.

On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske  wrote:

> Hi Navneeth,
>
> there's a lower level state interface that should address your
> requirements: OperatorStateStore.getUnionListState()
>
> This union list state is similar to the regular operator list state, but
> instead of splitting the list for recovery and giving out splits to
> operator instance, it restores the complete list on each operator instance.
> So it basically does a broadcast restore. If all operator have the same
> state, only one instance checkpoints its state and this state is restored
> to all other instances in case of a failure. This should also work with
> rescaling.
> The operator instance to checkpoint can be identified by 
> (RuntimeContext.getIndexOfThisSubtask
> == 0).
>
> The OperatorStateStore is a bit hidden. You have to implement the
> CheckpointedFunction interface. When 
> CheckpointedFunction.initializeState(FunctionInitializationContext
> context) is called context has a method getOperatorStateStore().
>
> I'd recommend to have a look at the detailed JavaDocs of all involved
> classes and methods.
>
> Hope this helps,
> Fabian
>
>
> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan :
>
>> Thanks Gordon for your response. I have around 80 parallel flatmap
>> operator instances and each instance requires 3 states. Out of which one is
>> user state in which each operator will have unique user's data and I need
>> this data to be queryable. The other two states are kind of static states
>> which are only modified when there an update message in config stream. This
>> static data could easily be around 2GB and in my previous approach I used
>> operator state where the data is retrieved inside open method across all
>> operator instances whereas checkpointed only inside one of the operator
>> instance.
>>
>> One of the issue that I have is if I change the operator parallelism how
>> would it affect the internal state?
>>
>>
>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Navneeth,
>>>
>>> Answering your three questions separately:
>>>
>>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an
>>> entry
>>> from the map state, the state will be removed from the local RocksDB as
>>> well.
>>>
>>> 2. If state classes are not POJOs, they will be serialized by Kryo,
>>> unless a
>>> custom serializer is specifically specified otherwise. You can take a
>>> look
>>> at this document on how to do that [1].
>>>
>>> 3. I might need to know more information to be able to suggest properly
>>> for
>>> this one. How are you using the "huge state values"? From what you
>>> described, it seems like you only need it on one of the parallel
>>> instances,
>>> so I'm a bit curious on what they are actually used for. Are they needed
>>> when processing your records?
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/stream/state.html#custom-serialization-for-managed-state
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>


Re: State Maintenance

2017-09-07 Thread Fabian Hueske
Hi Navneeth,

there's a lower level state interface that should address your
requirements: OperatorStateStore.getUnionListState()

This union list state is similar to the regular operator list state, but
instead of splitting the list for recovery and giving out splits to
operator instance, it restores the complete list on each operator instance.
So it basically does a broadcast restore. If all operator have the same
state, only one instance checkpoints its state and this state is restored
to all other instances in case of a failure. This should also work with
rescaling.
The operator instance to checkpoint can be identified by
(RuntimeContext.getIndexOfThisSubtask == 0).

The OperatorStateStore is a bit hidden. You have to implement the
CheckpointedFunction interface. When
CheckpointedFunction.initializeState(FunctionInitializationContext context)
is called context has a method getOperatorStateStore().

I'd recommend to have a look at the detailed JavaDocs of all involved
classes and methods.

Hope this helps,
Fabian


2017-09-05 19:35 GMT+02:00 Navneeth Krishnan :

> Thanks Gordon for your response. I have around 80 parallel flatmap
> operator instances and each instance requires 3 states. Out of which one is
> user state in which each operator will have unique user's data and I need
> this data to be queryable. The other two states are kind of static states
> which are only modified when there an update message in config stream. This
> static data could easily be around 2GB and in my previous approach I used
> operator state where the data is retrieved inside open method across all
> operator instances whereas checkpointed only inside one of the operator
> instance.
>
> One of the issue that I have is if I change the operator parallelism how
> would it affect the internal state?
>
>
> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Navneeth,
>>
>> Answering your three questions separately:
>>
>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an entry
>> from the map state, the state will be removed from the local RocksDB as
>> well.
>>
>> 2. If state classes are not POJOs, they will be serialized by Kryo,
>> unless a
>> custom serializer is specifically specified otherwise. You can take a look
>> at this document on how to do that [1].
>>
>> 3. I might need to know more information to be able to suggest properly
>> for
>> this one. How are you using the "huge state values"? From what you
>> described, it seems like you only need it on one of the parallel
>> instances,
>> so I'm a bit curious on what they are actually used for. Are they needed
>> when processing your records?
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/stream/state.html#custom-serialization-for-managed-state
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: State Maintenance

2017-09-05 Thread Navneeth Krishnan
Thanks Gordon for your response. I have around 80 parallel flatmap operator
instances and each instance requires 3 states. Out of which one is user
state in which each operator will have unique user's data and I need this
data to be queryable. The other two states are kind of static states which
are only modified when there an update message in config stream. This
static data could easily be around 2GB and in my previous approach I used
operator state where the data is retrieved inside open method across all
operator instances whereas checkpointed only inside one of the operator
instance.

One of the issue that I have is if I change the operator parallelism how
would it affect the internal state?


On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Navneeth,
>
> Answering your three questions separately:
>
> 1. Yes. Your MapState will be backed by RocksDB, so when removing an entry
> from the map state, the state will be removed from the local RocksDB as
> well.
>
> 2. If state classes are not POJOs, they will be serialized by Kryo, unless
> a
> custom serializer is specifically specified otherwise. You can take a look
> at this document on how to do that [1].
>
> 3. I might need to know more information to be able to suggest properly for
> this one. How are you using the "huge state values"? From what you
> described, it seems like you only need it on one of the parallel instances,
> so I'm a bit curious on what they are actually used for. Are they needed
> when processing your records?
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/state.html#custom-serialization-for-managed-state
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: State Maintenance

2017-09-05 Thread Tzu-Li (Gordon) Tai
Hi Navneeth,

Answering your three questions separately:

1. Yes. Your MapState will be backed by RocksDB, so when removing an entry
from the map state, the state will be removed from the local RocksDB as
well.

2. If state classes are not POJOs, they will be serialized by Kryo, unless a
custom serializer is specifically specified otherwise. You can take a look
at this document on how to do that [1].

3. I might need to know more information to be able to suggest properly for
this one. How are you using the "huge state values"? From what you
described, it seems like you only need it on one of the parallel instances,
so I'm a bit curious on what they are actually used for. Are they needed
when processing your records?

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#custom-serialization-for-managed-state



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/