Compressing Avro entities in State

2021-08-09 Thread Sandeep khanzode
Hello,

I am using the Maven Avro plugin to create SpecificRecord classes for a bunch 
of entities specified in .avsc files.

I use the generated Java classes directly in a few MapStates.


My state is now growing very large. I wanted to know whether there is any 
config or code to specify Snappy or Parquet or any other as a compression 
format, either by default or specifically.


Thanks,
Sandip





Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-09 Thread Sandeep khanzode
Hi Yun,

Yes, I will check how these classes are getting included. 

I do see the below coming in from a Kafka-Streams dependency being used for 
another module.

org.rocksdb:rocksdbjni:jar:5.18.4:compile

Thanks,
Sandip


> On 10-Aug-2021, at 8:22 AM, Yun Tang  wrote:
> 
> Hi Sandeep,
> 
> I'm afraid that it's because the configuration of your maven plugins like 
> maven-assembly or maven-shaded to include the classes.
> 
> Best
> Yun Tang
> From: Sandeep khanzode 
> Sent: Saturday, August 7, 2021 10:20
> To: Yun Tang 
> Cc: Stephan Ewen ; user 
> Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading
>  
> Hi Yun,
> 
> You are right. This is what I see in my application JAR … 
> 
> Does this mean that the Rocks DB dependency is still getting packaged in my 
> application JAR and this is what is causing the issue?
> 
> I have made the scope as Provided for that dependency. Is it being 
> transitively pulled in? All my Flink dependencies are set to Provided in 
> Maven.
> 
> 
> 
> 
> 
>> On 06-Aug-2021, at 7:58 AM, Yun Tang > <mailto:myas...@live.com>> wrote:
>> 
>> Hi Sandeep,
>> 
>> If you set the flink-statebackend-rocksdb as provided scope, it should not 
>> include the org.rocksdb classes, have you ever checked your application jar 
>> package directly just as what I described?
>> 
>> 
>> Best
>> Yun Tang
>> From: Sandeep khanzode mailto:sand...@shiftright.ai>>
>> Sent: Friday, August 6, 2021 2:04
>> To: Stephan Ewen mailto:se...@apache.org>>
>> Cc: user mailto:user@flink.apache.org>>; Yun Tang 
>> mailto:myas...@live.com>>
>> Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading
>>  
>> Hello Stephan, Yun,
>> 
>> Thanks for your insights.
>> 
>> All I have added is this:
>> 
>> org.apache.flink
>> flink-statebackend-rocksdb_2.12
>> ${flink.version}
>> provided
>> 
>> 
>> No other library explicitly added. I am assuming, as mentioned, is that the 
>> flink-dist.jar already contains the relevant classes and the App or parent 
>> class loader loads the Rocks DB classes. All other Flink dependencies are 
>> packaged as Maven - Provided.
>> 
>> Moving to parent-first gives the Spring Framework serialisation issues … I 
>> will take a look at that …
>> 
>> I thought it would be simpler to simply specify Bloom Filters as an option … 
>> 
>> Maybe, I will have to remove Spring dependency … 
>> 
>> 
>> Thanks,
>> Sandip
>> 
>> 
>> 
>>> On 05-Aug-2021, at 5:55 PM, Stephan Ewen >> <mailto:se...@apache.org>> wrote:
>>> 
>>> @Yun Tang
>>> 
>>> Our FRocksDB has the same java package names (org.rocksdb.). Adding 
>>> 'org.rocksdb' to parent-first patterns ensures it will be loaded only once, 
>>> and not accidentally multiple times (as Child-first classloading does).
>>> 
>>> The RocksDB code here is a bit like Flink internal components, which we 
>>> always force parent-first (everything that starts with "org.apache.fink.").
>>> 
>>> To use RocksDB from the application jar, I think you would need to remove 
>>> the RocksDB state backend from the classpath (lib folder), or you get 
>>> exactly the error reported above.
>>> 
>>> I cannot think of a downside to add RocksDB to the parent first patterns.
>>> 
>>> On Thu, Aug 5, 2021 at 10:04 AM Yun Tang >> <mailto:myas...@live.com>> wrote:
>>> Hi Stephan,
>>> 
>>> Since we use our own FRocksDB instead of the original RocksDB as 
>>> dependency, I am not sure whether this problem has relationship with this. 
>>> From my knowledge, more customers would include Flink classes within the 
>>> application jar package, and it might cause problems if the client has 
>>> different flink version with servers.
>>> 
>>> 
>>> Best,
>>> Yun Tang
>>> From: Stephan Ewen mailto:se...@apache.org>>
>>> Sent: Wednesday, August 4, 2021 19:10
>>> To: Yun Tang mailto:myas...@live.com>>
>>> Cc: Sandeep khanzode >> <mailto:sand...@shiftright.ai>>; user >> <mailto:user@flink.apache.org>>
>>> Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading
>>>  
>>> @Yun Tang Does it make sense to add RocksDB to the "always parent-first 
>>> options" to avoid these kind of errors when users package apps incorrectly?
>>> My fe

Re: Bloom Filter - RocksDB - LinkageError Classloading

2021-08-05 Thread Sandeep khanzode
Hello Stephan, Yun,

Thanks for your insights.

All I have added is this:

org.apache.flink
flink-statebackend-rocksdb_2.12
${flink.version}
provided


No other library explicitly added. I am assuming, as mentioned, is that the 
flink-dist.jar already contains the relevant classes and the App or parent 
class loader loads the Rocks DB classes. All other Flink dependencies are 
packaged as Maven - Provided.

Moving to parent-first gives the Spring Framework serialisation issues … I will 
take a look at that …

I thought it would be simpler to simply specify Bloom Filters as an option … 

Maybe, I will have to remove Spring dependency … 


Thanks,
Sandip



> On 05-Aug-2021, at 5:55 PM, Stephan Ewen  wrote:
> 
> @Yun Tang
> 
> Our FRocksDB has the same java package names (org.rocksdb.). Adding 
> 'org.rocksdb' to parent-first patterns ensures it will be loaded only once, 
> and not accidentally multiple times (as Child-first classloading does).
> 
> The RocksDB code here is a bit like Flink internal components, which we 
> always force parent-first (everything that starts with "org.apache.fink.").
> 
> To use RocksDB from the application jar, I think you would need to remove the 
> RocksDB state backend from the classpath (lib folder), or you get exactly the 
> error reported above.
> 
> I cannot think of a downside to add RocksDB to the parent first patterns.
> 
> On Thu, Aug 5, 2021 at 10:04 AM Yun Tang  <mailto:myas...@live.com>> wrote:
> Hi Stephan,
> 
> Since we use our own FRocksDB instead of the original RocksDB as dependency, 
> I am not sure whether this problem has relationship with this. From my 
> knowledge, more customers would include Flink classes within the application 
> jar package, and it might cause problems if the client has different flink 
> version with servers.
> 
> 
> Best,
> Yun Tang
> From: Stephan Ewen mailto:se...@apache.org>>
> Sent: Wednesday, August 4, 2021 19:10
> To: Yun Tang mailto:myas...@live.com>>
> Cc: Sandeep khanzode mailto:sand...@shiftright.ai>>; 
> user mailto:user@flink.apache.org>>
> Subject: Re: Bloom Filter - RocksDB - LinkageError Classloading
>  
> @Yun Tang Does it make sense to add RocksDB to the "always parent-first 
> options" to avoid these kind of errors when users package apps incorrectly?
> My feeling is that these packaging errors occur very frequently.
> 
> 
> On Wed, Aug 4, 2021 at 10:41 AM Yun Tang  <mailto:myas...@live.com>> wrote:
> Hi Sandeep,
> 
> Did you include the RocksDB classes in the application jar package? You can 
> unpark your jar package to check whether them existed.
> If so, since RocksDB classes are already included in the flink-dist package, 
> you don't need to include them in your jar package (maybe you explicitly 
> added the dependency of org.rocksdb:rocksdbjni in your pom).
> 
> Best
> Yun Tang
> From: Sandeep khanzode mailto:sand...@shiftright.ai>>
> Sent: Wednesday, August 4, 2021 11:54
> To: user mailto:user@flink.apache.org>>
> Subject: Bloom Filter - RocksDB - LinkageError Classloading
>  
> Hello,
> 
> I tried to add the bloom filter functionality as mentioned here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html>
> 
>  rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
> 
>   public DBOptions createDBOptions(DBOptions currentOptions, 
> Collection handlesToClose) {
>   return currentOptions.setMaxOpenFiles(1024);
>   }
> 
>   public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
> currentOptions, Collection handlesToClose) {
>   BloomFilter bloomFilter = new BloomFilter();
>   handlesToClose.add(bloomFilter);
> 
>   return currentOptions
>   .setTableFormatConfig(
>   new 
> BlockBasedTableConfig().setFilter(bloomFilter));
>   }
>  });
> 
> This is in the main class where we setup in the StreamExecutionEnvironment …
> 
> I get ClassLoading errors due to that ...
> Caused by: java.lang.LinkageError: loader constraint violation: loader 
> org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
> org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
> previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
> module of loader 'app')
> 
> 
> What is documented is to change the order to parent-first in the 
> flink-conf.yaml … but then I get different issues for the basic/core Spring 
> Framework classes not being serializable …
> 
> Any help will be appreciated.
> 
> Thanks,
> Sandip



Bloom Filter - RocksDB - LinkageError Classloading

2021-08-03 Thread Sandeep khanzode
Hello,

I tried to add the bloom filter functionality as mentioned here:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.html
 


 rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {

public DBOptions createDBOptions(DBOptions currentOptions, 
Collection handlesToClose) {
return currentOptions.setMaxOpenFiles(1024);
}

public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions, Collection handlesToClose) {
BloomFilter bloomFilter = new BloomFilter();
handlesToClose.add(bloomFilter);

return currentOptions
.setTableFormatConfig(
new 
BlockBasedTableConfig().setFilter(bloomFilter));
}
 });

This is in the main class where we setup in the StreamExecutionEnvironment …

I get ClassLoading errors due to that ...
Caused by: java.lang.LinkageError: loader constraint violation: loader 
org.apache.flink.util.ChildFirstClassLoader @1169afe1 wants to load class 
org.rocksdb.ColumnFamilyOptions. A different class with the same name was 
previously loaded by 'app'. (org.rocksdb.ColumnFamilyOptions is in unnamed 
module of loader 'app')


What is documented is to change the order to parent-first in the 
flink-conf.yaml … but then I get different issues for the basic/core Spring 
Framework classes not being serializable …

Any help will be appreciated.

Thanks,
Sandip

Queryable State Lookup Failure

2021-07-23 Thread Sandeep khanzode
Hello,

With the default memory settings, after about 5000 records in my 
KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]


I read about this and tried to increase the memory settings as below, which 
took care of that problem … 

jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2300m
taskmanager.memory.network.max: 768m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.managed.fraction: 0.45
taskmanager.memory.network.min: 192m
taskmanager.memory.task.off-heap.size: 512m


But now I have the below issue at exactly or approximately at the same time 
i.e. about after 5000 records. It doesn’t matter whether I send them in a burst 
or stagger them, strangely after that limit, it always blows up i.e. approx 
near to 4.5 to 5.5 records.

Now I am doing multiple state lookups for the Queryable State. Previously I 
used to do about 50% compared to what I did not and I could ingest millions of 
records. But simply doubling the number of lookups has caused the Queryable 
State to fail.

What memory settings do I have to change to rectify this? Any help will be 
appreciated.

I have also seen the BufferPool error sometimes … 


java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed 
request 67.
 Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No 
KvStateLocation found for KvState instance with name ‘queryable-data'.
at 
org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
at 
org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown 
Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at 

Re: Flink State Processor API Example - Java

2021-07-02 Thread Sandeep khanzode
Hi Guowei,

I followed the document, but somehow, I am unable to get a working Java example 
for Avro state.

So, I tried to simply use the Java SpecificRecords created by Avro Maven Plugin 
and inject. Now, that works correctly, but I use Avro 1.7.7 since it is the 
last version that I saw which does not put a serialVersionUid in the generated 
SpecificRecord. 

How can I use a Avro SpecificRecord generated by 1.8.0 if it use a 
serialVersionUid because the moment I change something in the Avro schema and 
regenerate the SpecificRecord, I will get a class incompatible error when Flink 
tried to deserialise.


Thanks,
Sandip



> On 25-Jun-2021, at 10:25 AM, Guowei Ma  wrote:
> 
> Hi Sandeep
> 
> What I understand is that you want to manipulate the state. So I think you 
> could use the old schema to read the state first, and then write it to a new 
> schema, instead of using a new schema to read an old schema format data.
> In addition, I would like to ask, if you want to do "State Schema Evolution" 
> ? Flink currently supports avro+pojo's schema evolution[1], and you don't 
> need to do this manually.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/#supported-data-types-for-schema-evolution>
> 
> Best,
> Guowei
> 
> 
> On Fri, Jun 25, 2021 at 3:04 AM Sandeep khanzode  <mailto:sand...@shiftright.ai>> wrote:
> Hello,
> 
> 1.] Can someone please share a working example of how to read 
> ValueState and MapState from a checkpoint and update it? I tried 
> to assemble a working Java example but there are bit and pieces of info 
> around. 
> 
> 2.] I am using Avro 1.7.7 with Flink for state entities since versions belong 
> Avro 1.7.7 add a serialVersionUid and then I cannot replace the class with a 
> new Avro schema seamlessly. If I update the Avro schema, and the Avro Maven 
> plugin runs, a new class with a new serialVersionUid is created and that 
> cannot be replaced in the state with the Java exception stating that local 
> and state copies are different.  Any example would be really appreciated.
> 
> Thanks,
> Sandip



Flink State Processor API Example - Java

2021-06-24 Thread Sandeep khanzode
Hello,

1.] Can someone please share a working example of how to read ValueState 
and MapState from a checkpoint and update it? I tried to assemble a 
working Java example but there are bit and pieces of info around. 

2.] I am using Avro 1.7.7 with Flink for state entities since versions belong 
Avro 1.7.7 add a serialVersionUid and then I cannot replace the class with a 
new Avro schema seamlessly. If I update the Avro schema, and the Avro Maven 
plugin runs, a new class with a new serialVersionUid is created and that cannot 
be replaced in the state with the Java exception stating that local and state 
copies are different.  Any example would be really appreciated.

Thanks,
Sandip

Re: [Avro] TypeSerializer Example

2021-05-10 Thread Sandeep khanzode
Hello,

Can someone please assist for this query? Thanks!

Thanks,
Sandeep

> On 06-May-2021, at 10:30 AM, Sandeep khanzode  wrote:
> 
> Hi,
> 
> Is there a working example somewhere that I can refer for writing Avro 
> entities in Flink state as well as Avro serializaition in 
> KafkaConsumer/Producer?
> 
> I tried to use Avro entities directly but there is an issue beyond Apache 
> Avro 1.7.7 in that the entities created have a serialVersionUid. So when I 
> tried to test schema evolution by adding a member, there was the java 
> serialization issue saying the two generated classes’ serialVersionUids do 
> not match i.e. the one stored in the state and the one being used with the 
> new member variable now.
> 
> Is there any configuration that overrides this?
> 
> Request you to please provide some references of samples. Thanks. 
> 
> Thanks,
> Sandeep 
> 
> 
>> On 30-Apr-2021, at 5:00 PM, Timo Walther  wrote:
>> 
>> I also found these pages:
>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> 
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro
>> 
>> I hope this helps.
>> 
>> Regards,
>> Timo
>> 
>> 
>> On 30.04.21 13:20, Sandeep khanzode wrote:
>>> Hi Timo,
>>> Thanks! I will take a look at the links.
>>> Can you please share if you have any simple (or complex) example of Avro 
>>> state data structures?
>>> Thanks,
>>> Sandeep
>>>> On 30-Apr-2021, at 4:46 PM, Timo Walther  wrote:
>>>> 
>>>> Hi Sandeep,
>>>> 
>>>> did you have a chance to look at this documentation page?
>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/custom_serialization.html
>>>> 
>>>> The interfaces might not be easy to implement but are very powerful to 
>>>> address compatibility issues. You can also look into Flink serializers for 
>>>> some examples:
>>>> 
>>>> https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime
>>>> 
>>>> Esp:
>>>> 
>>>> https://github.com/apache/flink/blob/89c6c03660a88a648bbd13b4e6696124fe46d013/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L599
>>>> 
>>>> For the POJO logic.
>>>> 
>>>> By the way, usually we recommend Avro for state data structures if schema 
>>>> evolution is a topic.
>>>> 
>>>> Regards,
>>>> Timo
>>>> 
>>>> 
>>>> 
>>>> On 29.04.21 18:10, Sandeep khanzode wrote:
>>>>> Hello,
>>>>> Is there a working example of a TypeSerializer for a Java type stored in 
>>>>> the State?
>>>>> My requirement is that I should be able to store the Java POJO entity in 
>>>>> the MapState. The state is backed by RocksDBBackend.
>>>>> If I update the entity with a new member variable, I am unable to 
>>>>> deserialise the state into the new entity.
>>>>> I checked this link.
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html>
>>>>> It does mention that the POJO type is special based on the rules. Does 
>>>>> that mean that I can add or remove member variables for the POJO? I have 
>>>>> been unable to get it to work.
>>>>> Thanks,
>>>>> Sandeep
>>>> 
>> 
> 



[Avro] Re: TypeSerializer Example

2021-05-05 Thread Sandeep khanzode
Hi,

Is there a working example somewhere that I can refer for writing Avro entities 
in Flink state as well as Avro serializaition in KafkaConsumer/Producer?

I tried to use Avro entities directly but there is an issue beyond Apache Avro 
1.7.7 in that the entities created have a serialVersionUid. So when I tried to 
test schema evolution by adding a member, there was the java serialization 
issue saying the two generated classes’ serialVersionUids do not match i.e. the 
one stored in the state and the one being used with the new member variable now.

Is there any configuration that overrides this?

Request you to please provide some references of samples. Thanks. 

Thanks,
Sandeep 


> On 30-Apr-2021, at 5:00 PM, Timo Walther  wrote:
> 
> I also found these pages:
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> 
> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro
> 
> I hope this helps.
> 
> Regards,
> Timo
> 
> 
> On 30.04.21 13:20, Sandeep khanzode wrote:
>> Hi Timo,
>> Thanks! I will take a look at the links.
>> Can you please share if you have any simple (or complex) example of Avro 
>> state data structures?
>> Thanks,
>> Sandeep
>>> On 30-Apr-2021, at 4:46 PM, Timo Walther  wrote:
>>> 
>>> Hi Sandeep,
>>> 
>>> did you have a chance to look at this documentation page?
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/custom_serialization.html
>>> 
>>> The interfaces might not be easy to implement but are very powerful to 
>>> address compatibility issues. You can also look into Flink serializers for 
>>> some examples:
>>> 
>>> https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime
>>> 
>>> Esp:
>>> 
>>> https://github.com/apache/flink/blob/89c6c03660a88a648bbd13b4e6696124fe46d013/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L599
>>> 
>>> For the POJO logic.
>>> 
>>> By the way, usually we recommend Avro for state data structures if schema 
>>> evolution is a topic.
>>> 
>>> Regards,
>>> Timo
>>> 
>>> 
>>> 
>>> On 29.04.21 18:10, Sandeep khanzode wrote:
>>>> Hello,
>>>> Is there a working example of a TypeSerializer for a Java type stored in 
>>>> the State?
>>>> My requirement is that I should be able to store the Java POJO entity in 
>>>> the MapState. The state is backed by RocksDBBackend.
>>>> If I update the entity with a new member variable, I am unable to 
>>>> deserialise the state into the new entity.
>>>> I checked this link.
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html>
>>>> It does mention that the POJO type is special based on the rules. Does 
>>>> that mean that I can add or remove member variables for the POJO? I have 
>>>> been unable to get it to work.
>>>> Thanks,
>>>> Sandeep
>>> 
> 



Re: TypeSerializer Example

2021-04-30 Thread Sandeep khanzode
Hi Timo,

Thanks! I will take a look at the links.

Can you please share if you have any simple (or complex) example of Avro state 
data structures?

Thanks,
Sandeep

> On 30-Apr-2021, at 4:46 PM, Timo Walther  wrote:
> 
> Hi Sandeep,
> 
> did you have a chance to look at this documentation page?
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/custom_serialization.html
> 
> The interfaces might not be easy to implement but are very powerful to 
> address compatibility issues. You can also look into Flink serializers for 
> some examples:
> 
> https://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime
> 
> Esp:
> 
> https://github.com/apache/flink/blob/89c6c03660a88a648bbd13b4e6696124fe46d013/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L599
> 
> For the POJO logic.
> 
> By the way, usually we recommend Avro for state data structures if schema 
> evolution is a topic.
> 
> Regards,
> Timo
> 
> 
> 
> On 29.04.21 18:10, Sandeep khanzode wrote:
>> Hello,
>> Is there a working example of a TypeSerializer for a Java type stored in the 
>> State?
>> My requirement is that I should be able to store the Java POJO entity in the 
>> MapState. The state is backed by RocksDBBackend.
>> If I update the entity with a new member variable, I am unable to 
>> deserialise the state into the new entity.
>> I checked this link.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html>
>> It does mention that the POJO type is special based on the rules. Does that 
>> mean that I can add or remove member variables for the POJO? I have been 
>> unable to get it to work.
>> Thanks,
>> Sandeep
> 



TypeSerializer Example

2021-04-29 Thread Sandeep khanzode
Hello,

Is there a working example of a TypeSerializer for a Java type stored in the 
State?

My requirement is that I should be able to store the Java POJO entity in the 
MapState. The state is backed by RocksDBBackend.

If I update the entity with a new member variable, I am unable to deserialise 
the state into the new entity. 

I checked this link.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
 


It does mention that the POJO type is special based on the rules. Does that 
mean that I can add or remove member variables for the POJO? I have been unable 
to get it to work. 


Thanks,
Sandeep

Queryable State unavailable after Kubernetes HA State cleanup

2021-04-28 Thread Sandeep khanzode
Hello,

Stuck at this time. Any help will be appreciated.


I am able to create a queryable state and also query the state. Everything 
works correctly.
KeyedStream, Key> stream = sourceStream.keyBy(t2 -> t2.f0);
stream.asQueryableState("queryableVO");


I deploy this on a Kubernetes cluster with Flink standalone-job and 
KubernetesHAFactory. 

There are two states created. One is the operator and keyed state which is 
stored in a RocksDB Backend in S3.

The other is the HA state maintained by Kubernetes in S3.

If anything changes in the job main class (like removing operators etc.), the 
upgrade does not work seamlessly and I have to delete the HA state from S3.

If I delete the S3 state for HA, the queryable state becomes unusable i.e. I 
cannot query from the state anymore. Interestingly, the other operator and 
keyed states in RocksDB backend are still accessible! Just not the queryable 
state.

When I check the UI, I see the checkpointed state for the queryable stream has 
a data size of approx ~50-60KB. But I still cannot query it.


Thanks,
Sandeep



Restore from Checkpoint from local Standalone Job

2021-03-26 Thread Sandeep khanzode
Hello


I was reading this: 
https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question


I am trying to run a standalone job on my local with a single job manager and 
task manager. 



I have enabled checkpointing as below:
env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", 
true));

env.enableCheckpointing(30 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


After I stop my job (I also tried to cancel the job using bin/flink cancel -s 
/Users/test/savepoint ), I tried to start the same job using…

./standalone-job.sh start-foreground test.jar --job-id   --job-classname 
com.test.MyClass --fromSavepoint /Users/test/savepoint


But it never restores the state, and always starts afresh. 


In Flink, I see this: 
StandaloneCompletedCheckpointStore
* {@link CompletedCheckpointStore} for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
public void recover() throws Exception {
// Nothing to do
}

Does this have something to do with not being able to restore state?

Does this need Zookeeper or K8S HA for functioning?


Thanks,
Sandeep



FlinkKafkaConsumer - Broadcast - Initial Load

2021-03-25 Thread Sandeep khanzode
Hi,

I have a master/reference data that needs to come in through a 
FlinkKafkaConsumer to be broadcast to all nodes and subsequently joined with 
the actual stream for enriching content.

The Kafka consumer gets CDC-type records from database changes. All this works 
well.


My question is how do I initialise the pipeline for the first set of records in 
the database? i.e. those that are not CDC? 

When I deploy for the first time, I would need all the DB records to be sent to 
the FlinkKafkaConsumer before any CDC updates happen.

Is there a hook that allows for the first time initial load of the records in 
the Kafka topic to be broadcast?



Also, about the broadcast state, since we are persisting the state in RocksDB 
backend, I am assuming that the state backend would have the latest records and 
even if the task manager crashes and restarts, we will have the correct Kafka 
consumer group topic offsets recorded so that the next time, we do not 
“startFromEarliest”? Is that right?

Will the state always maintain the updates to the records as well as the Kafka 
topic offsets?


Thanks,
Sandeep 

Re: Flink on Minikube

2021-03-25 Thread Sandeep khanzode
Hi Arvid,

Thanks, will set the scope to Provided and try. 

Are there public examples in GitHub that demonstrate a sample app in Minikube? 

Sandeep

> On 23-Mar-2021, at 3:17 PM, Arvid Heise  wrote:
> 
> Hi Sandeep,
> 
> please have a look at [1], you should add most Flink dependencies as provided 
> - exceptions are connectors (or in general stuff that is not in flink/lib/ or 
> flink/plugins).
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#setting-up-a-project-basic-dependencies
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#setting-up-a-project-basic-dependencies>
> On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode  <mailto:sand...@shiftright.ai>> wrote:
> Hi Arvid,
> 
> I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I 
> wanted to set this up for my testing purposes.
> 
> Below is the Dockerfile:
> FROM apache/flink:1.12.1-java11
> RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar 
> /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
> ADD myJar.jar /opt/flink/usrlib/myJar.jar
> 
> … But, in my process, this is a Fat JAR created by the Maven Shade Plugin. 
> Are you saying that all Flink classes should not be part of the user JAR? How 
> does that work? Do we set the scope of the dependencies to compile (or, not 
> runtime) for Flink Jars? Do we have any samples/examples that shows this? 
> Would be really helpful.
> 
> 
>> On 22-Mar-2021, at 8:00 PM, Arvid Heise > <mailto:ar...@apache.org>> wrote:
>> 
>> Hi Sandeep,
>> 
>> The first error definitively indicates a classloading issue, which may also 
>> be the cause for the second error.
>> 
>> Can you describe where you put your jar inside the docker image and which 
>> execution mode you are using? As a general rule, the jar is not supposed to 
>> go into flink/lib.
>> Also make sure to never shade non-connector classes of Flink into your jar. 
>> A typical user jar should be ~1MB.
>> 
>> On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode > <mailto:sand...@shiftright.ai>> wrote:
>> Hello,
>> 
>> I have a fat JAR compiled using the Man Shade plugin and everything  works 
>> correctly when I deploy it on a standalone local cluster i.e. one job and 
>> one task manager node.
>> 
>> But I installed Minikube and the same JAR file packaged into a docker image 
>> fails with weird serialization  errors:
>> 
>> Caused by: java.lang.ClassCastException: cannot assign instance of 
>> java.lang.invoke.SerializedLambda to field 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
>>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
>> 
>> 
>> … or in certain cases, if I comment out everything except the Kafka Source, 
>> then ...
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
>> org.apache.kafka.common.requests.MetadataRequest$Builder
>> 
>> 
>> Is there anything I am missing with the Minikube setup? I initially tried 
>> with the steps for the Job Application cluster on the website, but I was 
>> unable to get the /usrlib mounted from the hostpath. 
>> 
>> 
>> So, I created a simple docker image from ...
>> apache/flink:1.12.1-java11
>> 
>> But I have not had any success getting the same job to run here. Please let 
>> me know if there are well-known steps or issues that I can check.
>> 
>> Thanks,
>> Sandeep
> 



Re: QueryableStateClient getKVState

2021-03-23 Thread Sandeep khanzode
Hi Matthias,

Thanks. But yes, I am comparing map with that.map … the comment is probably for 
the previous variable name.

I can use String, Int, Enum, Long type keys in the Key that I send in the Query 
getKvState … but the moment I introduce a TreeMap, even though it contains a 
simple one entry String, String, it doesn’t work … 

Thanks,
Sandeep

> On 23-Mar-2021, at 7:00 PM, Matthias Pohl  wrote:
> 
> Hi Sandeep,
> the equals method does not compare the this.map with that.map but 
> that.dimensions. ...at least in your commented out code. Might this be the 
> problem?
> 
> Best,
> Matthias
> 
> On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode  <mailto:sand...@shiftright.ai>> wrote:
> Hi,
> 
> I have a stream that exposes the state for Queryable State.
> 
> I am using the key as follows:
> 
> public class MyKey {
> private Long first;
> private EnumType myType;
> private Long second;
> 
> private TreeMap map;
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> MyKey that = (MyKey) o;
> boolean isEqual = first.longValue() == that.first.longValue() &&
> myTime.name().equalsIgnoreCase(that.myTime.name()) &&
> second.longValue() == that.second.longValue();// &&
> //map.equals(that.dimensions);
> return isEqual;
> }
> 
> @Override
> public int hashCode() {
> int result = first != null ? first.hashCode() : 0;
> result = 31 * result + (myType != null ? myType.name().hashCode() : 
> 0);
> result = 31 * result + (second != null ? second.hashCode() : 0);
> //result = 31 * result + (map != null ? map.hashCode() : 0);
> return result;
> }
> }
> 
> 
> If I only set the first three members for the key class, then the key lookup 
> works correctly.
> 
> If I add the TreeMap, then the lookup always errors with the message; “No 
> state found for the given key/namespace”.
> 
> What am I dong wrong with the TreeMap as a member in the Key class for 
> equals/hashcode?
> 
> Thanks,
> Sandeep



Re: Flink on Minikube

2021-03-22 Thread Sandeep khanzode
Hi Arvid,

I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster. I 
wanted to set this up for my testing purposes.

Below is the Dockerfile:
FROM apache/flink:1.12.1-java11
RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar 
/opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
ADD myJar.jar /opt/flink/usrlib/myJar.jar

… But, in my process, this is a Fat JAR created by the Maven Shade Plugin. Are 
you saying that all Flink classes should not be part of the user JAR? How does 
that work? Do we set the scope of the dependencies to compile (or, not runtime) 
for Flink Jars? Do we have any samples/examples that shows this? Would be 
really helpful.


> On 22-Mar-2021, at 8:00 PM, Arvid Heise  wrote:
> 
> Hi Sandeep,
> 
> The first error definitively indicates a classloading issue, which may also 
> be the cause for the second error.
> 
> Can you describe where you put your jar inside the docker image and which 
> execution mode you are using? As a general rule, the jar is not supposed to 
> go into flink/lib.
> Also make sure to never shade non-connector classes of Flink into your jar. A 
> typical user jar should be ~1MB.
> 
> On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode  <mailto:sand...@shiftright.ai>> wrote:
> Hello,
> 
> I have a fat JAR compiled using the Man Shade plugin and everything  works 
> correctly when I deploy it on a standalone local cluster i.e. one job and one 
> task manager node.
> 
> But I installed Minikube and the same JAR file packaged into a docker image 
> fails with weird serialization  errors:
> 
> Caused by: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
>  of type org.apache.flink.api.java.functions.KeySelector in instance of 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
> 
> 
> … or in certain cases, if I comment out everything except the Kafka Source, 
> then ...
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.kafka.common.requests.MetadataRequest$Builder
> 
> 
> Is there anything I am missing with the Minikube setup? I initially tried 
> with the steps for the Job Application cluster on the website, but I was 
> unable to get the /usrlib mounted from the hostpath. 
> 
> 
> So, I created a simple docker image from ...
> apache/flink:1.12.1-java11
> 
> But I have not had any success getting the same job to run here. Please let 
> me know if there are well-known steps or issues that I can check.
> 
> Thanks,
> Sandeep



QueryableStateClient getKVState

2021-03-22 Thread Sandeep khanzode
Hi,

I have a stream that exposes the state for Queryable State.

I am using the key as follows:

public class MyKey {
private Long first;
private EnumType myType;
private Long second;

private TreeMap map;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyKey that = (MyKey) o;
boolean isEqual = first.longValue() == that.first.longValue() &&
myTime.name().equalsIgnoreCase(that.myTime.name()) &&
second.longValue() == that.second.longValue();// &&
//map.equals(that.dimensions);
return isEqual;
}

@Override
public int hashCode() {
int result = first != null ? first.hashCode() : 0;
result = 31 * result + (myType != null ? myType.name().hashCode() : 0);
result = 31 * result + (second != null ? second.hashCode() : 0);
//result = 31 * result + (map != null ? map.hashCode() : 0);
return result;
}
}


If I only set the first three members for the key class, then the key lookup 
works correctly.

If I add the TreeMap, then the lookup always errors with the message; “No state 
found for the given key/namespace”.

What am I dong wrong with the TreeMap as a member in the Key class for 
equals/hashcode?

Thanks,
Sandeep

Flink on Minikube

2021-03-19 Thread Sandeep khanzode
Hello,

I have a fat JAR compiled using the Man Shade plugin and everything  works 
correctly when I deploy it on a standalone local cluster i.e. one job and one 
task manager node.

But I installed Minikube and the same JAR file packaged into a docker image 
fails with weird serialization  errors:

Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
 of type org.apache.flink.api.java.functions.KeySelector in instance of 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner


… or in certain cases, if I comment out everything except the Kafka Source, 
then ...
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.kafka.common.requests.MetadataRequest$Builder


Is there anything I am missing with the Minikube setup? I initially tried with 
the steps for the Job Application cluster on the website, but I was unable to 
get the /usrlib mounted from the hostpath. 


So, I created a simple docker image from ...
apache/flink:1.12.1-java11

But I have not had any success getting the same job to run here. Please let me 
know if there are well-known steps or issues that I can check.

Thanks,
Sandeep

Get JobId and JobManager RPC Address in RichMapFunction executed in TaskManager

2021-02-24 Thread Sandeep khanzode
Hello,

I am deploying a standalone-job cluster (cluster with a single Job and Task 
Manager instance instantiated with a —job-classname and —job-id).

I have map/flatmap/process functions being executed in the various stream 
functions in the Taskmanager for which I need access to the Job Id and the 
JobManager RPC address. How can I get access to these variables? What in-built 
environment/context/configuration functions exist for this purpose?

I need these two variables for queryable-state.

Thanks

Re: State Access Beyond RichCoFlatMapFunction

2021-02-19 Thread Sandeep khanzode
Hello,

Is there an example setup of Queryable State for a Local Embedded Environment?

I am trying to execute Flink programs from within IntelliJ. Any help would be 
appreciated!

Even if not, if there are other examples where QueryableState can be executed 
in a standalone cluster, that would also be good help. Thanks.


> On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:
> 
> (a) It is by design. For keyed state, you can only access state for that key, 
> not others. If you want one value per key, ValueState fits more appropriate 
> that MapState.
> (b) state-processor-api aims to access/create/modify/upgrade offline 
> savepoint but not running state. Queryable state may meet your requirement, 
> but it is not actively developed for a while according to my observation and 
> still beta. 
> 
> Queryable state: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
> 
> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hello,
>> 
>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>> connect() two streams to basically share the state of one stream in another. 
>> 
>> This is what I do:
>> private transient MapState state;
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> MapStateDescriptor stateDescriptor =
>> new MapStateDescriptor<>(“abc-saved-state",
>> Types.POJO(KeyClass.class), 
>> Types.POJO(ValueClass.class));
>> state = getRuntimeContext().getMapState(stateDescriptor);
>> 
>> This works correctly.
>> 
>> 
>> I have two questions:
>> (a) Whenever I debug, I can only see the current key in the MapState, not 
>> all the possible keys that were created before and saved. Next time, I get a 
>> hit for another key, I will only see the other key and not the rest of 
>> previous keys. Is it by design or am I missing something?
>> 
>> (b) Can I somehow access this state beyond the class that holds the state? 
>> I.e. can I access the state in some other class? If not, can I use the 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html>
>>  to do this? Is that the correct way to access the running state of one 
>> stream elsewhere in the program without corrupting it?
>> 
>> 
>> Your response will be greatly appreciated. I will be happy to add more 
>> details if required.
>> 
>> Thanks,
>> Sandeep Ramesh Khanzode



Re: State Access Beyond RichCoFlatMapFunction

2021-02-12 Thread Sandeep khanzode
Oh okay. Got it. I will check. Thanks.

> On 12-Feb-2021, at 3:14 PM, Kezhu Wang  wrote:
> 
> Hi Sandeep,
> 
> I must mislead you by inaccurate words. I did not mean using 
> CoGroupedStreams, but only CoGroupedStreams.apply as reference for how to 
> union streams together and keyBy them. This way you can have all three 
> streams’ states in downstream without duplication.
> 
> Best,
> Kezhu Wang
> On February 11, 2021 at 20:49:20, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hello,
>> 
>> Can you please share if you have some example of CoGroupedStreams? Thanks!
>> 
>>> On 10-Feb-2021, at 3:22 PM, Kezhu Wang >> <mailto:kez...@gmail.com>> wrote:
>>> 
>>> > Actually, my use case is that I want to share the state of one stream in 
>>> > two other streams. Right now, I can think of connecting this stream 
>>> > independently with each of the two other streams and manage the state 
>>> > twice, effectively duplicating it.
>>> 
>>> > Only the matching keys (with the two other streams) will do. 
>>> 
>>> I assume that `ConnectedStreams` meets your requirements but your don’t 
>>> want duplicate that state twice ? Then, I think there are ways:
>>> 1. Union all three streams to one and then keyBy. You can see 
>>> `CoGroupedStreams` for reference.
>>> 2. You can try `MultipleInputStreamOperator` and 
>>> `AbstractStreamOperatorV2`. But most usages of these two are currently 
>>> Flink tests and internal.
>>>  You could reach out `MultipleInputITCase.testKeyedState` for reference.
>>> 
>>> 
>>> * CoGroupedStreams union: 
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369>
>>> * MultipleInputITCase.testKeyedState: 
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113>
>>> 
>>> On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai 
>>> <mailto:sand...@shiftright.ai>) wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Yes, but the stream, whose state I want to share, will be indefinite and 
>>>> have a large volume. Also, not all keys from that stream have to go to 
>>>> every Task Node. Only the matching keys (with the two other streams) will 
>>>> do. 
>>>>  
>>>> Please let me know if there is another cleaner way to achieve this. Thanks.
>>>> 
>>>> 
>>>>> On 10-Feb-2021, at 12:44 PM, Kezhu Wang >>>> <mailto:kez...@gmail.com>> wrote:
>>>>> 
>>>>> Flink has broadcast state to broadcast one stream to other in case you 
>>>>> are not aware of it. It actually duplicates state.
>>>>> 
>>>>> 1. Broadcast state: 
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
>>>>> 
>>>>> Best,
>>>>> Kezhu Wang
>>>>> 
>>>>> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai 
>>>>> <mailto:sand...@shiftright.ai>) wrote:
>>>>> 
>>>>>> Hello,
>>>>>> 
>>>>>> Thanks a lot for the response. I will try to check Queryable-state for 
>>>>>> this purpose. 
>>>>>> 
>>>>>> Actually, my use case is that I want to share the state of one stream in 
>>>>>> two other streams. Right now, I can think of connecting this stream 
>>>>>> independently with each of the two other streams and manage the state 
>>>>>> twice, effectively duplicating it.
>>>>>> 
>>>>>> I was trying to check whether there are options where I can share this 
>>>>>> state with both the streams but save it only once.
>>>>>> 
>>>>>&

Re: State Access Beyond RichCoFlatMapFunction

2021-02-11 Thread Sandeep khanzode
Hello,

Can you please share if you have some example of CoGroupedStreams? Thanks!

> On 10-Feb-2021, at 3:22 PM, Kezhu Wang  wrote:
> 
> > Actually, my use case is that I want to share the state of one stream in 
> > two other streams. Right now, I can think of connecting this stream 
> > independently with each of the two other streams and manage the state 
> > twice, effectively duplicating it.
> 
> > Only the matching keys (with the two other streams) will do. 
> 
> I assume that `ConnectedStreams` meets your requirements but your don’t want 
> duplicate that state twice ? Then, I think there are ways:
> 1. Union all three streams to one and then keyBy. You can see 
> `CoGroupedStreams` for reference.
> 2. You can try `MultipleInputStreamOperator` and `AbstractStreamOperatorV2`. 
> But most usages of these two are currently Flink tests and internal.
>  You could reach out `MultipleInputITCase.testKeyedState` for reference.
> 
> 
> * CoGroupedStreams union: 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369
>  
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L369>
> * MultipleInputITCase.testKeyedState: 
> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113
>  
> <https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java#L113>
> 
> On February 10, 2021 at 17:19:15, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hi,
>> 
>> Yes, but the stream, whose state I want to share, will be indefinite and 
>> have a large volume. Also, not all keys from that stream have to go to every 
>> Task Node. Only the matching keys (with the two other streams) will do. 
>>  
>> Please let me know if there is another cleaner way to achieve this. Thanks.
>> 
>> 
>>> On 10-Feb-2021, at 12:44 PM, Kezhu Wang >> <mailto:kez...@gmail.com>> wrote:
>>> 
>>> Flink has broadcast state to broadcast one stream to other in case you are 
>>> not aware of it. It actually duplicates state.
>>> 
>>> 1. Broadcast state: 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
>>> 
>>> Best,
>>> Kezhu Wang
>>> 
>>> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai 
>>> <mailto:sand...@shiftright.ai>) wrote:
>>> 
>>>> Hello,
>>>> 
>>>> Thanks a lot for the response. I will try to check Queryable-state for 
>>>> this purpose. 
>>>> 
>>>> Actually, my use case is that I want to share the state of one stream in 
>>>> two other streams. Right now, I can think of connecting this stream 
>>>> independently with each of the two other streams and manage the state 
>>>> twice, effectively duplicating it.
>>>> 
>>>> I was trying to check whether there are options where I can share this 
>>>> state with both the streams but save it only once.
>>>> 
>>>> 
>>>>> On 10-Feb-2021, at 9:05 AM, Kezhu Wang >>>> <mailto:kez...@gmail.com>> wrote:
>>>>> 
>>>>> (a) It is by design. For keyed state, you can only access state for that 
>>>>> key, not others. If you want one value per key, ValueState fits more 
>>>>> appropriate that MapState.
>>>>> (b) state-processor-api aims to access/create/modify/upgrade offline 
>>>>> savepoint but not running state. Queryable state may meet your 
>>>>> requirement, but it is not actively developed for a while according to my 
>>>>> observation and still beta. 
>>>>> 
>>>>> Queryable state: 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>>>>  
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
>>>>> 
>>>>> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
>>>>> <mailto:sand...@shiftright.ai>) wrote:
>>>>> 
>>>>&

Re: State Access Beyond RichCoFlatMapFunction

2021-02-10 Thread Sandeep khanzode
Hi,

Yes, but the stream, whose state I want to share, will be indefinite and have a 
large volume. Also, not all keys from that stream have to go to every Task 
Node. Only the matching keys (with the two other streams) will do. 
 
Please let me know if there is another cleaner way to achieve this. Thanks.


> On 10-Feb-2021, at 12:44 PM, Kezhu Wang  wrote:
> 
> Flink has broadcast state to broadcast one stream to other in case you are 
> not aware of it. It actually duplicates state.
> 
> 1. Broadcast state: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
> 
> Best,
> Kezhu Wang
> 
> On February 10, 2021 at 13:03:36, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hello,
>> 
>> Thanks a lot for the response. I will try to check Queryable-state for this 
>> purpose. 
>> 
>> Actually, my use case is that I want to share the state of one stream in two 
>> other streams. Right now, I can think of connecting this stream 
>> independently with each of the two other streams and manage the state twice, 
>> effectively duplicating it.
>> 
>> I was trying to check whether there are options where I can share this state 
>> with both the streams but save it only once.
>> 
>> 
>>> On 10-Feb-2021, at 9:05 AM, Kezhu Wang >> <mailto:kez...@gmail.com>> wrote:
>>> 
>>> (a) It is by design. For keyed state, you can only access state for that 
>>> key, not others. If you want one value per key, ValueState fits more 
>>> appropriate that MapState.
>>> (b) state-processor-api aims to access/create/modify/upgrade offline 
>>> savepoint but not running state. Queryable state may meet your requirement, 
>>> but it is not actively developed for a while according to my observation 
>>> and still beta. 
>>> 
>>> Queryable state: 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
>>> 
>>> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
>>> <mailto:sand...@shiftright.ai>) wrote:
>>> 
>>>> Hello,
>>>> 
>>>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>>>> connect() two streams to basically share the state of one stream in 
>>>> another. 
>>>> 
>>>> This is what I do:
>>>> private transient MapState state;
>>>> @Override
>>>> public void open(Configuration parameters) throws Exception {
>>>> MapStateDescriptor stateDescriptor =
>>>> new MapStateDescriptor<>(“abc-saved-state",
>>>> Types.POJO(KeyClass.class), 
>>>> Types.POJO(ValueClass.class));
>>>> state = getRuntimeContext().getMapState(stateDescriptor);
>>>> 
>>>> This works correctly.
>>>> 
>>>> 
>>>> I have two questions:
>>>> (a) Whenever I debug, I can only see the current key in the MapState, not 
>>>> all the possible keys that were created before and saved. Next time, I get 
>>>> a hit for another key, I will only see the other key and not the rest of 
>>>> previous keys. Is it by design or am I missing something?
>>>> 
>>>> (b) Can I somehow access this state beyond the class that holds the state? 
>>>> I.e. can I access the state in some other class? If not, can I use the 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html>
>>>>  to do this? Is that the correct way to access the running state of one 
>>>> stream elsewhere in the program without corrupting it?
>>>> 
>>>> 
>>>> Your response will be greatly appreciated. I will be happy to add more 
>>>> details if required.
>>>> 
>>>> Thanks,
>>>> Sandeep Ramesh Khanzode



Re: State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Sandeep khanzode
Hello,

Thanks a lot for the response. I will try to check Queryable-state for this 
purpose. 

Actually, my use case is that I want to share the state of one stream in two 
other streams. Right now, I can think of connecting this stream independently 
with each of the two other streams and manage the state twice, effectively 
duplicating it.

I was trying to check whether there are options where I can share this state 
with both the streams but save it only once.


> On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:
> 
> (a) It is by design. For keyed state, you can only access state for that key, 
> not others. If you want one value per key, ValueState fits more appropriate 
> that MapState.
> (b) state-processor-api aims to access/create/modify/upgrade offline 
> savepoint but not running state. Queryable state may meet your requirement, 
> but it is not actively developed for a while according to my observation and 
> still beta. 
> 
> Queryable state: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html>
> 
> On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai 
> <mailto:sand...@shiftright.ai>) wrote:
> 
>> Hello,
>> 
>> I am creating a class that extends RichCoFlatMapFunction. I need to 
>> connect() two streams to basically share the state of one stream in another. 
>> 
>> This is what I do:
>> private transient MapState state;
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> MapStateDescriptor stateDescriptor =
>> new MapStateDescriptor<>(“abc-saved-state",
>> Types.POJO(KeyClass.class), 
>> Types.POJO(ValueClass.class));
>> state = getRuntimeContext().getMapState(stateDescriptor);
>> 
>> This works correctly.
>> 
>> 
>> I have two questions:
>> (a) Whenever I debug, I can only see the current key in the MapState, not 
>> all the possible keys that were created before and saved. Next time, I get a 
>> hit for another key, I will only see the other key and not the rest of 
>> previous keys. Is it by design or am I missing something?
>> 
>> (b) Can I somehow access this state beyond the class that holds the state? 
>> I.e. can I access the state in some other class? If not, can I use the 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html>
>>  to do this? Is that the correct way to access the running state of one 
>> stream elsewhere in the program without corrupting it?
>> 
>> 
>> Your response will be greatly appreciated. I will be happy to add more 
>> details if required.
>> 
>> Thanks,
>> Sandeep Ramesh Khanzode



State Access Beyond RichCoFlatMapFunction

2021-02-09 Thread Sandeep khanzode
Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to connect() 
two streams to basically share the state of one stream in another. 

This is what I do:
private transient MapState state;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);

This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not all 
the possible keys that were created before and saved. Next time, I get a hit 
for another key, I will only see the other key and not the rest of previous 
keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state? I.e. 
can I access the state in some other class? If not, can I use the 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
 

 to do this? Is that the correct way to access the running state of one stream 
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more details 
if required.

Thanks,
Sandeep Ramesh Khanzode