Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-17 Thread Arvid Heise
Hi Chirag,

Which Flink version are you using? As far as I understand, the issue is
appearing just by writing the initial data - no recovery happened right?

Could you try to change the code such that you only have a single
read/update on the state? It should work as you have done it but I'd like
to pinpoint the issue further.

On Thu, Jun 10, 2021 at 8:25 AM Yun Gao  wrote:

> Hi Chirag,
>
> Logically Integer type should not have this issue. Sorry that from the
> current description I
> have not found other issues, could you also share the code in the main
> method that
> adds the KeyProcessFunction into the job ? Very thanks!
>
> Best,
> Yun
>
> --
> From:Chirag Dewan 
> Send Time:2021 Jun. 9 (Wed.) 15:15
> To:User ; Yun Gao 
> Subject:Re: Multiple Exceptions during Load Test in State Access APIs with
> RocksDB
>
> Thanks for the reply Yun.
>
> The key is an Integer type. Do you think there can be hash collisions for
> Integers?
>
> It somehow works on single TM now. No errors for 1m records.
> But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out
> of Bound', key not in Keygroup etc.
>
> This also causes a NPE in the user defined code -
>
> if (valueState != null)
> valueState.value() -> This causes Null, so while the if check passed,
> it caused an NPE while reading the value.
>
> Thanks,
> Chirag
>
> On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao 
> wrote:
>
>
> Hi Chirag,
>
> As far as I know, If you are running a single job, I think all th pods
> share the same
> state.checkpoints.dir configuration should be as expected, and it is not
> necessary
> to configuraiton the rocksdb local dir since Flink will chosen a default
> dir.
>
> Regarding the latest exception, I think you might first check the key type
> used and
> the key type should has a stable hashcode method.
>
> Best,
> Yun
>
>
>
> ------Original Mail --
> *Sender:*Chirag Dewan 
> *Send Date:*Tue Jun 8 18:06:07 2021
> *Recipients:*User , Yun Gao 
> *Subject:*Re: Multiple Exceptions during Load Test in State Access APIs
> with RocksDB
> Hi,
>
> Although this looks like a problem to me, I still cant conclude it.
>
> I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each.
> I was hoping that with single TM there will be file write conflicts. But
> that doesn't seem to be the case as still get the:
>
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.lang.IllegalArgumentException: Key group 2 is not in
> KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
>
> I have checked that there's no concurrent access on the ValueState.
>
> Any more leads?
>
> Thanks,
> Chirag
>
>
> On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Hi,
>
> I think I got my issue. Would help if someone can confirm it :)
>
> I am using a NFS filesystem for storing my checkpoints and my Flink
> cluster is running on a K8 with 2 TMs and 2 JMs.
>
> All my pods share the NFS PVC with state.checkpoint.dir and we also missed
> setting the RocksDB local dir.
>
> Does this lead to state corruption?
>
> Thanks,
> Chirag
>
>
>
> On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan <
> chirag.dewa...@yahoo.in> wrote:
>
>
> Thanks for the reply Yun. I strangely don't see any nulls. And infact this
> exception comes on the first few records and then job starts processing
> normally.
>
> Also, I don't see any reason for Concurrent access to the state in my
> code. Could more CPU cores than task slots to the Task Manager be the
> reason for it?
>
> On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao 
> wrote:
>
>
> Hi Chirag,
>
> If be able to produce the exception, could you first add some logs to print
> the value of valueState, valueState.value(), inEvent and
> inEvent.getPriceDelta() ?
> I think either object being null would cause NullPointerException here.
>
> For the second exception, I found a similar issue[1], caused by concurrent
> access to the value state. Do we have the similar situation here ?
>
> Best,
> Yun
>
> [1] https://issues.apache.org/jira/browse/FLINK-18587
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Chirag Dewan 
> *Send Date:*Sat Jun 5 20:29:37 2021
> *Recipients:*User 
> *Subject:*Multiple Exceptions during Load Test in State Access APIs with
> RocksDB
> Hi,
>
> I am getting multiple exceptions while trying to use RocksDB as astate
&

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-10 Thread Yun Gao
Hi Chirag,

Logically Integer type should not have this issue. Sorry that from the current 
description I 
have not found other issues, could you also share the code in the main method 
that
adds the KeyProcessFunction into the job ? Very thanks!

Best,
Yun


--
From:Chirag Dewan 
Send Time:2021 Jun. 9 (Wed.) 15:15
To:User ; Yun Gao 
Subject:Re: Multiple Exceptions during Load Test in State Access APIs with 
RocksDB

Thanks for the reply Yun.

The key is an Integer type. Do you think there can be hash collisions for 
Integers?

It somehow works on single TM now. No errors for 1m records.
But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out of 
Bound', key not in Keygroup etc.

This also causes a NPE in the user defined code -

if (valueState != null)
valueState.value() -> This causes Null, so while the if check passed, it 
caused an NPE while reading the value.

Thanks,
Chirag

 On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao  
wrote: 


Hi Chirag,

As far as I know, If you are running a single job, I think all th pods share 
the same 
state.checkpoints.dir configuration should be as expected, and it is not 
necessary 
to configuraiton the rocksdb local dir since Flink will chosen a default dir.

Regarding the latest exception, I think you might first check the key type used 
and 
the key type should has a stable hashcode method. 

Best,
Yun




 --Original Mail --
Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021
Recipients:User , Yun Gao 
Subject:Re: Multiple Exceptions during Load Test in State Access APIs with 
RocksDB
Hi,

Although this looks like a problem to me, I still cant conclude it. 

I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:


Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.

Any more leads?

Thanks,
Chirag


 On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote: 


Hi,

I think I got my issue. Would help if someone can confirm it :)

I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 

All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.

Does this lead to state corruption?

Thanks,
Chirag



 On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote: 


Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.

Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?

 On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote: 


Hi Chirag,

If be able to produce the exception, could you first add some logs to print
the value of valueState, valueState.value(), inEvent and 
inEvent.getPriceDelta() ?
I think either object being null would cause NullPointerException here. 

For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-18587

Best,
Yun


 --Original Mail --
Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021
Recipients:User 
Subject:Multiple Exceptions during Load Test in State Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 
I have 2 Task Managers with 2 taskslots and 4 cores each. 
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction {
private ValueStatevalueState;
@Override
public void open() throws Exception {
ValueStateDescriptor descriptor =
   new ValueStateDescriptor(
   "totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)
   throws Exception {
if(valueState.value() == null) {
   valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
   output.setPlayerId(inEvent.getPlayerId());
   output.setNetPrize(sum);

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-09 Thread Chirag Dewan
 Thanks for the reply Yun.
The key is an Integer type. Do you think there can be hash collisions for 
Integers?
It somehow works on single TM now. No errors for 1m records.But as soon as we 
move to 2 TMs, we get all sort of errors - 'Position Out of Bound', key not in 
Keygroup etc.
This also causes a NPE in the user defined code -
if (valueState != null)    valueState.value() -> This causes Null, so while the 
if check passed, it caused an NPE while reading the value.

Thanks,Chirag
On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
As far as I know, If you are running a single job, I think all th pods share 
the same state.checkpoints.dir configuration should be as expected, and it is 
not necessary to configuraiton the rocksdb local dir since Flink will chosen a 
default dir.
Regarding the latest exception, I think you might first check the key type used 
and the key type should has a stable hashcode method. 
Best,Yun



 --Original Mail --Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021Recipients:User 
, Yun Gao Subject:Re: Multiple 
Exceptions during Load Test in State Access APIs with RocksDB
 Hi,
Although this looks like a problem to me, I still cant conclude it. 
I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:
Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.
Any more leads?
Thanks,Chirag

On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote:  
 
  Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 
All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag


On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote:  
 
  Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is obse

Re: Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-08 Thread Yun Gao
Hi Chirag,

As far as I know, If you are running a single job, I think all th pods share 
the same 
state.checkpoints.dir configuration should be as expected, and it is not 
necessary 
to configuraiton the rocksdb local dir since Flink will chosen a default dir.

Regarding the latest exception, I think you might first check the key type used 
and 
the key type should has a stable hashcode method. 

Best,
Yun




 --Original Mail --
Sender:Chirag Dewan 
Send Date:Tue Jun 8 18:06:07 2021
Recipients:User , Yun Gao 
Subject:Re: Multiple Exceptions during Load Test in State Access APIs with 
RocksDB

Hi,

Although this looks like a problem to me, I still cant conclude it. 

I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:


Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.

Any more leads?

Thanks,
Chirag


 On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote: 


Hi,

I think I got my issue. Would help if someone can confirm it :)

I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 

All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.

Does this lead to state corruption?

Thanks,
Chirag



 On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote: 


Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.

Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?

 On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote: 


Hi Chirag,

If be able to produce the exception, could you first add some logs to print
the value of valueState, valueState.value(), inEvent and 
inEvent.getPriceDelta() ?
I think either object being null would cause NullPointerException here. 

For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-18587

Best,
Yun



 --Original Mail --
Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021
Recipients:User 
Subject:Multiple Exceptions during Load Test in State Access APIs with RocksDB

Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 
I have 2 Task Managers with 2 taskslots and 4 cores each. 
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction {
private ValueStatevalueState;
@Override
public void open() throws Exception {
ValueStateDescriptor descriptor =
   new ValueStateDescriptor(
   "totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)
   throws Exception {
if(valueState.value() == null) {
   valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
   output.setPlayerId(inEvent.getPlayerId());
   output.setNetPrize(sum);
   outEvents.add(output);
}
@Override
public void close() throws Exception {
   valueState.clear();
}
}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.

Another strange thing is that this is observed only in load conditions and 
works fine otherwise.

We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurren

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-08 Thread Chirag Dewan
 Hi,
Although this looks like a problem to me, I still cant conclude it. 
I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I 
was hoping that with single TM there will be file write conflicts. But that 
doesn't seem to be the case as still get the:
Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.IllegalArgumentException: Key group 2 is not in 
KeyGroupRange{startKeyGroup=64, endKeyGroup=95}.
I have checked that there's no concurrent access on the ValueState.
Any more leads?
Thanks,Chirag

On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan 
 wrote:  
 
  Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 
All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag


On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote:  
 
  Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and 
works fine otherwise.
We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.

atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)

at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)

atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)

at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)

atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)

Any leads would be appreciated. Thanks
Chirag



  

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-07 Thread Chirag Dewan
 Hi,
I think I got my issue. Would help if someone can confirm it :)
I am using a NFS filesystem for storing my checkpoints and my Flink cluster is 
running on a K8 with 2 TMs and 2 JMs. 
All my pods share the NFS PVC with state.checkpoint.dir and we also missed 
setting the RocksDB local dir.
Does this lead to state corruption?
Thanks,Chirag


On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan 
 wrote:  
 
  Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and 
works fine otherwise.
We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.

atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)

at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)

atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)

at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)

atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)

Any leads would be appreciated. Thanks
Chirag





Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-06 Thread Chirag Dewan
 Thanks for the reply Yun. I strangely don't see any nulls. And infact this 
exception comes on the first few records and then job starts processing 
normally.
Also, I don't see any reason for Concurrent access to the state in my code. 
Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao  
wrote:  
 
 Hi Chirag,
If be able to produce the exception, could you first add some logs to printthe 
value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ?I 
think either object being null would cause NullPointerException here. 
For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?
Best,Yun
[1] https://issues.apache.org/jira/browse/FLINK-18587
Best,Yun


 --Original Mail --Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021Recipients:User 
Subject:Multiple Exceptions during Load Test in State 
Access APIs with RocksDB
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and 
works fine otherwise.
We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.

atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)

at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)

atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)

at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)

atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)

Any leads would be appreciated. Thanks
Chirag



  

Re: Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-05 Thread Yun Gao
Hi Chirag,

If be able to produce the exception, could you first add some logs to print
the value of valueState, valueState.value(), inEvent and 
inEvent.getPriceDelta() ?
I think either object being null would cause NullPointerException here. 

For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-18587

Best,
Yun



 --Original Mail --
Sender:Chirag Dewan 
Send Date:Sat Jun 5 20:29:37 2021
Recipients:User 
Subject:Multiple Exceptions during Load Test in State Access APIs with RocksDB

Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 
I have 2 Task Managers with 2 taskslots and 4 cores each. 
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction {
private ValueStatevalueState;
@Override
public void open() throws Exception {
ValueStateDescriptor descriptor =
   new ValueStateDescriptor(
   "totalPrize",Integer.class);
valueState =getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)
   throws Exception {
if(valueState.value() == null) {
   valueState.update(0);
}
valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line
int sum =valueState.value();
GameZoneOutputoutput = new GameZoneOutput();
   output.setPlayerId(inEvent.getPlayerId());
   output.setNetPrize(sum);
   outEvents.add(output);
}
@Override
public void close() throws Exception {
   valueState.clear();
}
}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.

Another strange thing is that this is observed only in load conditions and 
works fine otherwise.

We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)


Any leads would be appreciated. Thanks

Chirag



Multiple Exceptions during Load Test in State Access APIs with RocksDB

2021-06-05 Thread Chirag Dewan
Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 

I have 2 Task Managers with 2 taskslots and 4 cores each. 

Below is our setup:

 

Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) > 
KeyedProcessFunction(4 Parallelism) > FlinkKafkaProducer(1Parallelism) 
> KafkaTopic

  

public class Aggregator_KeyedExpression extendsKeyedProcessFunction {

 

    private ValueStatevalueState;

 

    @Override

    public void open() throws Exception {

ValueStateDescriptor descriptor =

   new ValueStateDescriptor(

   "totalPrize",Integer.class);

 

    valueState =getRuntimeContext().getState(descriptor);

    }

 

@Override

    public void processElement(GameZoneInputinEvent, Context ctx, final 
List outEvents)

   throws Exception {

 

if(valueState.value() == null) {

   valueState.update(0);

    }

    

valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -> 
NullPointerException on this line

    

int sum =valueState.value();



    GameZoneOutputoutput = new GameZoneOutput();

   output.setPlayerId(inEvent.getPlayerId());

   output.setNetPrize(sum);

   outEvents.add(output);

 

    }

 

    @Override

    public void close() throws Exception {

   valueState.clear();

    }

}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.
Another strange thing is that this is observed only in load conditions and 
works fine otherwise.
We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.

atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)

at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)

atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)

at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)

atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)

Any leads would be appreciated. Thanks
Chirag