Re: Queryable state feature in latest version!!

2023-11-06 Thread Hangxiang Yu
Hi, Puneet.
Queryable State has been deprecated in the latest version which will be
removed in Flink 2.0.
The Interface/Usage is freezed in the 1.x, so you still could reference the
documents of previous versions to use it.
BTW, Could you also share something about your scenarios using it ? That
will help a lot for us to design a better feature about querying state for
users. Thanks~


On Tue, Nov 7, 2023 at 5:53 AM Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> Hi All
>
> We are using flink 1.10 version which were having Queryable state for
> querying the in-memory state. we are planning to migrate our old
> applications
> to newer version of the flink ,In latest version documents I can't find
> any reference to it. can anyone highlight what was approach to query in
> memory
> state in latest versions.
>
> Thanks
>
> Puneet
> Customer Centria Enterprise Solutions Pvt. Ltd. (‘Customer Centria’) has
> made the following annotations, “This message and the attachments are
> solely for the intended recipient and may contain confidential or
> privileged information. If you are not the intended recipient, any
> disclosure, copying, use, or distribution of the information included in
> the message and any attachments is prohibited. If you have received this
> communication in error, please notify us by reply e-mail immediately and
> permanently delete this message and any attachments. Thank you.
>


-- 
Best,
Hangxiang.


Re: Queryable state feature in latest version!!

2023-11-06 Thread Junrui Lee
Hi, Puneet


Thank you for reaching out. In the latest release of Flink (version 1.18),
we have marked Queryable State as @Deprecated and removed the related
content from the stable documentation. This means that Queryable State is
no longer actively supported or recommended for use. More details can be
found here: https://lists.apache.org/thread/9hmwcjb3q5c24pk3qshjvybfqk62v17m


If you plan to migrate to a version prior to 1.18, such as 1.17, you can
refer to the documentation at the following link for information on how to
query in-memory state using Queryable State:

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/queryable_state/


Best regards,

Junrui

Xuyang  于2023年11月7日周二 10:28写道:

> Hi, Puneet.
>
> Do you mean this doc[1]?
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/
>
>
> --
> Best!
> Xuyang
>
>
> At 2023-11-07 01:36:37, "Puneet Kinra" 
> wrote:
>
> Hi All
>
> We are using flink 1.10 version which were having Queryable state for
> querying the in-memory state. we are planning to migrate our old
> applications
> to newer version of the flink ,In latest version documents I can't find
> any reference to it. can anyone highlight what was approach to query in
> memory
> state in latest versions.
>
> Thanks
>
> Puneet
> Customer Centria Enterprise Solutions Pvt. Ltd. (‘Customer Centria’) has
> made the following annotations, “This message and the attachments are
> solely for the intended recipient and may contain confidential or
> privileged information. If you are not the intended recipient, any
> disclosure, copying, use, or distribution of the information included in
> the message and any attachments is prohibited. If you have received this
> communication in error, please notify us by reply e-mail immediately and
> permanently delete this message and any attachments. Thank you.
>
>


Re: Queryable State Deprecation

2022-03-11 Thread Ron Crocker
Hi Dawid -

I’m pretty keen on keeping it alive. Do we have a sense of what it would take 
to get it “to a production ready state?”

Thanks!

Ron

> On Feb 4, 2022, at 5:06 AM, Dawid Wysakowicz  wrote:
> 
> Hi Karthik,
> 
> The reason we deprecated it is because we lacked committers who could spend 
> time on getting the Queryable state to a production ready state. I might be 
> speaking for myself here, but I think the main use case for the queryable 
> state is to have an insight into the current state of the application for 
> debugging purposes. If it is used for data serving purposes, we believe it's 
> better to sink the data into an external store, which can provide better 
> discoverability and more user friendly APIs for querying the results.
> 
> As for debugging/tracking insights you may try to achieve similar results 
> with metrics.
> 
> Best,
> 
> Dawid
> 
> On 01/02/2022 16:36, Jatti, Karthik wrote:
>> Hi, 
>> 
>> I see on the Flink Roadmap that Queryable state API is scheduled to be 
>> deprecated but I couldn’t find much information on confluence or this 
>> mailing group’s archives to understand the background as to why it’s being 
>> deprecated and what would be a an alternative.  Any pointers to help me get 
>> some more information here would be great. 
>>  
>> Thanks,
>> Karthik 
>> 
>> 
>> The information in the email message containing a link to this page, 
>> including any attachments thereto (collectively, “the e-mail”), is only for 
>> use by the intended recipient(s). The e-mail may contain information that is 
>> confidential, proprietary and/or privileged. If you have reason to believe 
>> that you are not the intended recipient, please notify the sender that you 
>> may have received this e-mail in error and delete all copies of it, 
>> including attachments, from your computer. Any viewing, copying, disclosure 
>> or distribution of this information by an unintended recipient is prohibited 
>> and by an intended recipient may be governed by arrangements in place 
>> between the sender’s and recipient’s respective firms. Eze Software does not 
>> represent that the e-mail is virus-free, complete or accurate. Eze Software 
>> accepts no liability for any damage sustained in connection with the content 
>> or transmission of the e-mail.
>> Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.



Re: Queryable State Deprecation

2022-02-12 Thread Frank Dekervel

Hello,

This is what we did, but i'm not quite convinced that its the best way 
(maybe others could chime in ?).


 * We have a zalando postgres cluster running next to the flink
   cluster, so we can just use a jdbc sink for the state. In theory we
   should be able to switch to exactly once (we didn't do this so far)
 * our stateful processor is a state machine that emits outgoing
   messages based on incoming messages. Sometimes we need to "rewind"
   the state machine to correctly process an incoming message. This
   forces us to keep some history of past messages
 * We don't materialize the state directly, we only materialize the
   state changes, which are then re-materialized in postgres. It took
   us some time to make this bug-free. When we were still debugging
   this, we read a savepoint to look in the state and compare it with
   what we had in postgres.

In a zalando postgres cluster you can only write to the master. But for 
readers, if a small delay is acceptable, you can load balance to the 
replica's.


Greetings,

Frank


On 12.02.22 16:56, Jatti, Karthik wrote:


Hi Frank,

What sink did you end up choosing for materializing the state ?

Our use case into looking at queryable state is that we have many 
readers and a very few writers (readers to writers ratio in the 
1000s). Each consuming application (reader) needs a live view of a 
subset of the state and these applications come online and go offline 
many times a day. What would be a good sink in such a scenario ?


e.g if the state of the flink app was a dynamic table of inventory of 
products built from Kafka streams of purchases and sales. And a subset 
of this state needs to be available for 1000s of readers who have a 
live view of what is available in stock with different aggregations 
and filters . And these consumers come online and go offline, so they 
need to be able to restore their substate and continue to receive 
updates for it.


We are evaluating sinks but haven’t narrowed on anything that would 
look like an obvious case.


Thanks,

Karthik

*From: *Jatti, Karthik 
*Date: *Friday, February 11, 2022 at 6:00 PM
*To: *Frank Dekervel , user@flink.apache.org 
, dwysakow...@apache.org 

*Subject: *Re: Queryable State Deprecation

Thank you Frank and Dawid for providing the context here.

*From: *Frank Dekervel 
*Date: *Friday, February 4, 2022 at 9:56 AM
*To: *user@flink.apache.org 
*Subject: *Re: Queryable State Deprecation

*EXTERNAL SENDER*



Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks 
queryable state was apparently stagnant, we implemented what was 
needed to sink the state to an external data store for querying.


However, if queryable state was in good shape we would definately have 
used it. Making sure that the state is always reflected correctly in 
our external system turned out to be non-trivial for a number of 
reasons: our state is not trivially convertable to rows in a table, 
and sometimes we had (due to our own bugs, but still) inconsistencies 
between the internal flink state and the externally materialized 
state, especially after replaying from a checkpoint/savepoint after a 
crash (we cannot use exactly_once sinks in all occasions).


Also, obviously, we could not use flinks partitioning/parallellism to 
help making state querying more scalable.


Greetings,
Frank

On 04.02.22 14:06, Dawid Wysakowicz wrote:

Hi Karthik,

The reason we deprecated it is because we lacked committers who
could spend time on getting the Queryable state to a production
ready state. I might be speaking for myself here, but I think the
main use case for the queryable state is to have an insight into
the current state of the application for debugging purposes. If it
is used for data serving purposes, we believe it's better to sink
the data into an external store, which can provide better
discoverability and more user friendly APIs for querying the results.

As for debugging/tracking insights you may try to achieve similar
results with metrics.

Best,

Dawid

On 01/02/2022 16:36, Jatti, Karthik wrote:

Hi,

I see on the Flink Roadmap that Queryable state API is
scheduled to be deprecated but I couldn’t find much
information on confluence or this mailing group’s archives to
understand the background as to why it’s being deprecated and
what would be a an alternative.  Any pointers to help me get
some more information here would be great.

Thanks,

Karthik




The information in the email message containing a link to this
page, including any attachments thereto (collectively, “the
e-mail”), is only for use by the intended r

Re: Queryable State Deprecation

2022-02-12 Thread Jatti, Karthik
Hi Frank,

What sink did you end up choosing for materializing the state ?

Our use case into looking at queryable state is that we have many readers and a 
very few writers (readers to writers ratio in the 1000s). Each consuming 
application (reader) needs a live view of a subset of the state and these 
applications come online and go offline many times a day. What would be a good 
sink in such a scenario ?

e.g if the state of the flink app was a dynamic table of inventory of products 
built from Kafka streams of purchases and sales. And a subset of this state 
needs to be available for 1000s of readers who have a live view of what is 
available in stock with different aggregations and filters . And these 
consumers come online and go offline, so they need to be able to restore their 
substate and continue to receive updates for it.

We are evaluating sinks but haven’t narrowed on anything that would look like 
an obvious case.

Thanks,
Karthik


From: Jatti, Karthik 
Date: Friday, February 11, 2022 at 6:00 PM
To: Frank Dekervel , user@flink.apache.org 
, dwysakow...@apache.org 
Subject: Re: Queryable State Deprecation
Thank you Frank and Dawid for providing the context here.

From: Frank Dekervel 
Date: Friday, February 4, 2022 at 9:56 AM
To: user@flink.apache.org 
Subject: Re: Queryable State Deprecation
EXTERNAL SENDER


Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks queryable state 
was apparently  stagnant, we implemented what was needed to sink the state to 
an external data store for querying.

However, if queryable state was in good shape we would definately have used it. 
Making sure that the state is always reflected correctly in our external system 
turned out to be non-trivial for a number of reasons: our state is not 
trivially convertable to rows in a table, and sometimes we had (due to our own 
bugs, but still) inconsistencies between the internal flink state and the 
externally materialized state, especially after replaying from a 
checkpoint/savepoint after a crash (we cannot use exactly_once sinks in all 
occasions).

Also, obviously, we could not use flinks partitioning/parallellism to help 
making state querying more scalable.

Greetings,
Frank




On 04.02.22 14:06, Dawid Wysakowicz wrote:

Hi Karthik,

The reason we deprecated it is because we lacked committers who could spend 
time on getting the Queryable state to a production ready state. I might be 
speaking for myself here, but I think the main use case for the queryable state 
is to have an insight into the current state of the application for debugging 
purposes. If it is used for data serving purposes, we believe it's better to 
sink the data into an external store, which can provide better discoverability 
and more user friendly APIs for querying the results.

As for debugging/tracking insights you may try to achieve similar results with 
metrics.

Best,

Dawid
On 01/02/2022 16:36, Jatti, Karthik wrote:
Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this mailing 
group’s archives to understand the background as to why it’s being deprecated 
and what would be a an alternative.  Any pointers to help me get some more 
information here would be great.

Thanks,
Karthik



The information in the email message containing a link to this page, including 
any attachments thereto (collectively, “the e-mail”), is only for use by the 
intended recipient(s). The e-mail may contain information that is confidential, 
proprietary and/or privileged. If you have reason to believe that you are not 
the intended recipient, please notify the sender that you may have received 
this e-mail in error and delete all copies of it, including attachments, from 
your computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an intended 
recipient may be governed by arrangements in place between the sender’s and 
recipient’s respective firms. Eze Software does not represent that the e-mail 
is virus-free, complete or accurate. Eze Software accepts no liability for any 
damage sustained in connection with the content or transmission of the e-mail.

Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


Re: Queryable State Deprecation

2022-02-11 Thread Jatti, Karthik
Thank you Frank and Dawid for providing the context here.

From: Frank Dekervel 
Date: Friday, February 4, 2022 at 9:56 AM
To: user@flink.apache.org 
Subject: Re: Queryable State Deprecation
EXTERNAL SENDER


Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks queryable state 
was apparently  stagnant, we implemented what was needed to sink the state to 
an external data store for querying.

However, if queryable state was in good shape we would definately have used it. 
Making sure that the state is always reflected correctly in our external system 
turned out to be non-trivial for a number of reasons: our state is not 
trivially convertable to rows in a table, and sometimes we had (due to our own 
bugs, but still) inconsistencies between the internal flink state and the 
externally materialized state, especially after replaying from a 
checkpoint/savepoint after a crash (we cannot use exactly_once sinks in all 
occasions).

Also, obviously, we could not use flinks partitioning/parallellism to help 
making state querying more scalable.

Greetings,
Frank




On 04.02.22 14:06, Dawid Wysakowicz wrote:

Hi Karthik,

The reason we deprecated it is because we lacked committers who could spend 
time on getting the Queryable state to a production ready state. I might be 
speaking for myself here, but I think the main use case for the queryable state 
is to have an insight into the current state of the application for debugging 
purposes. If it is used for data serving purposes, we believe it's better to 
sink the data into an external store, which can provide better discoverability 
and more user friendly APIs for querying the results.

As for debugging/tracking insights you may try to achieve similar results with 
metrics.

Best,

Dawid
On 01/02/2022 16:36, Jatti, Karthik wrote:
Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this mailing 
group’s archives to understand the background as to why it’s being deprecated 
and what would be a an alternative.  Any pointers to help me get some more 
information here would be great.

Thanks,
Karthik



The information in the email message containing a link to this page, including 
any attachments thereto (collectively, “the e-mail”), is only for use by the 
intended recipient(s). The e-mail may contain information that is confidential, 
proprietary and/or privileged. If you have reason to believe that you are not 
the intended recipient, please notify the sender that you may have received 
this e-mail in error and delete all copies of it, including attachments, from 
your computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an intended 
recipient may be governed by arrangements in place between the sender’s and 
recipient’s respective firms. Eze Software does not represent that the e-mail 
is virus-free, complete or accurate. Eze Software accepts no liability for any 
damage sustained in connection with the content or transmission of the e-mail.

Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


Re: Queryable State Deprecation

2022-02-04 Thread Frank Dekervel

Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks 
queryable state was apparently  stagnant, we implemented what was needed 
to sink the state to an external data store for querying.


However, if queryable state was in good shape we would definately have 
used it. Making sure that the state is always reflected correctly in our 
external system turned out to be non-trivial for a number of reasons: 
our state is not trivially convertable to rows in a table, and sometimes 
we had (due to our own bugs, but still) inconsistencies between the 
internal flink state and the externally materialized state, especially 
after replaying from a checkpoint/savepoint after a crash (we cannot use 
exactly_once sinks in all occasions).


Also, obviously, we could not use flinks partitioning/parallellism to 
help making state querying more scalable.


Greetings,
Frank



On 04.02.22 14:06, Dawid Wysakowicz wrote:


Hi Karthik,

The reason we deprecated it is because we lacked committers who could 
spend time on getting the Queryable state to a production ready state. 
I might be speaking for myself here, but I think the main use case for 
the queryable state is to have an insight into the current state of 
the application for debugging purposes. If it is used for data serving 
purposes, we believe it's better to sink the data into an external 
store, which can provide better discoverability and more user friendly 
APIs for querying the results.


As for debugging/tracking insights you may try to achieve similar 
results with metrics.


Best,

Dawid

On 01/02/2022 16:36, Jatti, Karthik wrote:


Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to 
be deprecated but I couldn’t find much information on confluence or 
this mailing group’s archives to understand the background as to why 
it’s being deprecated and what would be a an alternative.  Any 
pointers to help me get some more information here would be great.


Thanks,

Karthik




The information in the email message containing a link to this page, 
including any attachments thereto (collectively, “the e-mail”), is 
only for use by the intended recipient(s). The e-mail may contain 
information that is confidential, proprietary and/or privileged. If 
you have reason to believe that you are not the intended recipient, 
please notify the sender that you may have received this e-mail in 
error and delete all copies of it, including attachments, from your 
computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an 
intended recipient may be governed by arrangements in place between 
the sender’s and recipient’s respective firms. Eze Software does not 
represent that the e-mail is virus-free, complete or accurate. Eze 
Software accepts no liability for any damage sustained in connection 
with the content or transmission of the e-mail.


Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.

Re: Queryable State Deprecation

2022-02-04 Thread Dawid Wysakowicz

Hi Karthik,

The reason we deprecated it is because we lacked committers who could 
spend time on getting the Queryable state to a production ready state. I 
might be speaking for myself here, but I think the main use case for the 
queryable state is to have an insight into the current state of the 
application for debugging purposes. If it is used for data serving 
purposes, we believe it's better to sink the data into an external 
store, which can provide better discoverability and more user friendly 
APIs for querying the results.


As for debugging/tracking insights you may try to achieve similar 
results with metrics.


Best,

Dawid

On 01/02/2022 16:36, Jatti, Karthik wrote:


Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this 
mailing group’s archives to understand the background as to why it’s 
being deprecated and what would be a an alternative.  Any pointers to 
help me get some more information here would be great.


Thanks,

Karthik




The information in the email message containing a link to this page, 
including any attachments thereto (collectively, “the e-mail”), is 
only for use by the intended recipient(s). The e-mail may contain 
information that is confidential, proprietary and/or privileged. If 
you have reason to believe that you are not the intended recipient, 
please notify the sender that you may have received this e-mail in 
error and delete all copies of it, including attachments, from your 
computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an 
intended recipient may be governed by arrangements in place between 
the sender’s and recipient’s respective firms. Eze Software does not 
represent that the e-mail is virus-free, complete or accurate. Eze 
Software accepts no liability for any damage sustained in connection 
with the content or transmission of the e-mail.


Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


OpenPGP_signature
Description: OpenPGP digital signature


Re: Queryable State Lookup Failure

2021-07-26 Thread Roman Khachatryan
Hello,

Could you check that TMs didn't fail and therefore unregistered KV
states and are still running at the time of the query?
Probably after changing the memory settings there is another error
that is reported later than the state is unregistered.

Regards,
Roman

On Sat, Jul 24, 2021 at 12:50 AM Sandeep khanzode  wrote:
>
> Hello,
>
> With the default memory settings, after about 5000 records in my 
> KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
> at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?]
> at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>
>
>
> I read about this and tried to increase the memory settings as below, which 
> took care of that problem …
>
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 2300m
> taskmanager.memory.network.max: 768m
> taskmanager.memory.network.fraction: 0.1
> taskmanager.memory.managed.fraction: 0.45
> taskmanager.memory.network.min: 192m
> taskmanager.memory.task.off-heap.size: 512m
>
>
>
> But now I have the below issue at exactly or approximately at the same time 
> i.e. about after 5000 records. It doesn’t matter whether I send them in a 
> burst or stagger them, strangely after that limit, it always blows up i.e. 
> approx near to 4.5 to 5.5 records.
>
> Now I am doing multiple state lookups for the Queryable State. Previously I 
> used to do about 50% compared to what I did not and I could ingest millions 
> of records. But simply doubling the number of lookups has caused the 
> Queryable State to fail.
>
> What memory settings do I have to change to rectify this? Any help will be 
> appreciated.
>
> I have also seen the BufferPool error sometimes …
>
>
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed 
> request 67.
>  Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No 
> KvStateLocation found for KvState instance with name ‘queryable-data'.
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
> at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at 

Re: Queryable State unavailable after Kubernetes HA State cleanup

2021-04-29 Thread Till Rohrmann
Hi Sandeep,

I don't fully understand the problematic scenario yet. What exactly is the
HA state maintained by Kubernetes in S3?

Queryable state works by asking for the current state of an operator. If
you use asQueryableState, then you create a reducing state which appends
all stream elements. This should then be stored in the configured state
backend (in your case probably RocksDB). For checkpoints, this state is
stored periodically on S3.

How is the query operation failing? Did you check the cluster logs whether
they contain some suspicious things?

Cheers,
Till

On Wed, Apr 28, 2021 at 5:26 PM Sandeep khanzode 
wrote:

> Hello,
>
> Stuck at this time. Any help will be appreciated.
>
>
> I am able to create a queryable state and also query the state. Everything
> works correctly.
>
> KeyedStream, Key> stream = sourceStream.keyBy(t2 -> t2.f0);
> stream.asQueryableState("queryableVO");
>
>
> I deploy this on a Kubernetes cluster with Flink standalone-job and
> KubernetesHAFactory.
>
> There are two states created. One is the operator and keyed state which is
> stored in a RocksDB Backend in S3.
>
> The other is the HA state maintained by Kubernetes in S3.
>
> If anything changes in the job main class (like removing operators etc.),
> the upgrade does not work seamlessly and I have to delete the HA state from
> S3.
>
> If I delete the S3 state for HA, the queryable state becomes unusable i.e.
> I cannot query from the state anymore. Interestingly, the other operator
> and keyed states in RocksDB backend are still accessible! Just not the
> queryable state.
>
> When I check the UI, I see the checkpointed state for the queryable stream
> has a data size of approx ~50-60KB. But I still cannot query it.
>
>
> Thanks,
> Sandeep
>
>


Re: Queryable state on task managers that are not running the job

2020-12-23 Thread Yun Tang
Hi Martin,

What kind of deploy mode you choose? If you use per-job mode [1] to launch 
jobs, there might exist only idle slots instead of idle taskmanagers. 
Currently, queryable state is bounded to specific job and if the idle 
taskmanager is not registered in the target's resource manager, no queryable 
state could be queried.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#per-job-cluster-mode

Best
Yun Tang

From: Martin Boyanov 
Sent: Monday, December 21, 2020 19:04
To: user@flink.apache.org 
Subject: Queryable state on task managers that are not running the job

Hi,
I'm running a long-running flink job in cluster mode and I'm interested in 
using the queryable state functionality.
I have the following problem: when I query the flink task managers (i.e. the 
queryable state proxy), it is possible to hit a task manager which doesn't have 
the requested state, because the job is not running on that task manager.
For example, I might have a cluster with 5 task managers, but the job is 
deployed only on 3 of those. If my query hits any of the two idle task 
managers, I naturally get an error message that the job does not exist.
My current solution is to size the cluster appropriately so that there are no 
idle task managers. I was wondering if there was a better solution or if this 
could be handled better in the future?
Thanks in advance.
Kind regards,
Martin


Re: Queryable State 查询反序列化问题

2019-11-12 Thread Congxian Qiu
Hi

从错误栈来看,应该是 serializer 不一致导致的,可以再检查下相应的 key/namespace serialzier

Best,
Congxian


chengwenfeng  于2019年11月12日周二 下午2:47写道:

> 大家好:
> 我在测试Querable State功能的时候,发现
> 语法
> dataStream.keyby(key).process();  这种语法下,简单的状态和复杂的POJO都可以查询
> 但在
>studentAnswerDataStream.connect(learningStrategyDataStream)
> .keyBy(val->val.getCourseId()+"_"+val.getTaskId()
> , val->val.getCourseId()+"_"+val.getTaskId())
> .process()  这种语法情况下,简单的状态可以,但复杂的POJO无法反序列化回来
>
>
>
>
> 错误:
>
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Failed request 0.
>  Caused by: java.lang.RuntimeException: Failed request 9.
>  Caused by: java.lang.RuntimeException: Error while processing request
> with ID 9. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Unexpected magic number 48.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:99)
> ... 10 more
>
>
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at cn.unipus.flink.GetQueryableState2.main(GetQueryableState2.java:41)
> Caused by: java.lang.RuntimeException: Failed request 0.
>  Caused by: java.lang.RuntimeException: Failed request 9.
>  Caused by: java.lang.RuntimeException: Error while processing request
> with ID 9. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:109)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:101)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
> at
> 

Re: Queryable state and TTL

2019-07-06 Thread Avi Levi
Thanks, I'll check it out.

On Sun, Jul 7, 2019 at 5:40 AM Eron Wright  wrote:

> *This Message originated outside your organization.*
> --
> Here's a PR for queryable state TLS that I closed because I didn't have
> time, and because I get the impression that the queryable state feature is
> used very often.Feel free to take it up, if you like.
> https://github.com/apache/flink/pull/6626
> 
>
> -Eron
>
> On Wed, Jul 3, 2019 at 11:21 PM Avi Levi  wrote:
>
>> Hi Yu,
>> Our sink is actually Kafka hence we cannot query it properly, from there
>> we distribute it to different consumers. We keep info in our state such as
>> entry time, some accumulated data etc' , this data is not kept elsewhere
>> hence we need to query our state.
>>
>> Best regards
>> Avi
>>
>>
>> On Thu, Jul 4, 2019 at 7:20 AM Yu Li  wrote:
>>
>>> *This Message originated outside your organization.*
>>> --
>>> Thanks for the ping Andrey.
>>>
>>> For me the general answer is yes, but TBH it will probably not be added
>>> in the foreseeable future due to lack of committer bandwidth (not only
>>> QueryableState with TTL but all about QueryableState module) as per
>>> Aljoscha pointed out in another thread [1].
>>>
>>> Although we could see emerging requirements and proposals on
>>> QueryableState recently, prioritizing is important for each open source
>>> project. And personally I think it may help if we could gather more and
>>> clearly describe the other-than-debugging use cases of QueryableState in
>>> production [2]. Could you share your case with us and why QueryableState is
>>> necessary rather than querying the data from sink @Avi? Thanks.
>>>
>>> [1] https://s.apache.org/MaOl
>>> 
>>> [2] https://s.apache.org/hJDA
>>> 
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin 
>>> wrote:
>>>
 Hi Avi,

 It is on the road map but I am not aware about plans of any contributor
 to work on it for the next releases.
 I think the community will firstly work on the event time support for
 TTL.
 I will loop Yu in, maybe he has some plans to work on TTL for the
 queryable state.

 Best,
 Andrey

 On Wed, Jul 3, 2019 at 3:17 PM Avi Levi 
 wrote:

> Hi,
> Adding queryable state to state with ttl is not supported at 1.8.0
> (throwing java.lang.IllegalArgumentException: Queryable state is currently
> not supported with TTL)
>
> I saw in previous mailing thread
> that
> it is on the roadmap. Is it still on the roadmap ?
>
> * There is a workaround which is using timers to clear the state, but
> in our case, it means firing billons of timers on daily basis all at the
> same time, which seems no to very efficient and might cause some resources
> issues
>
> Cheers
> Avi
>
>
>


Re: Queryable state and TTL

2019-07-06 Thread Eron Wright
Here's a PR for queryable state TLS that I closed because I didn't have
time, and because I get the impression that the queryable state feature is
used very often.Feel free to take it up, if you like.
https://github.com/apache/flink/pull/6626

-Eron

On Wed, Jul 3, 2019 at 11:21 PM Avi Levi  wrote:

> Hi Yu,
> Our sink is actually Kafka hence we cannot query it properly, from there
> we distribute it to different consumers. We keep info in our state such as
> entry time, some accumulated data etc' , this data is not kept elsewhere
> hence we need to query our state.
>
> Best regards
> Avi
>
>
> On Thu, Jul 4, 2019 at 7:20 AM Yu Li  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Thanks for the ping Andrey.
>>
>> For me the general answer is yes, but TBH it will probably not be added
>> in the foreseeable future due to lack of committer bandwidth (not only
>> QueryableState with TTL but all about QueryableState module) as per
>> Aljoscha pointed out in another thread [1].
>>
>> Although we could see emerging requirements and proposals on
>> QueryableState recently, prioritizing is important for each open source
>> project. And personally I think it may help if we could gather more and
>> clearly describe the other-than-debugging use cases of QueryableState in
>> production [2]. Could you share your case with us and why QueryableState is
>> necessary rather than querying the data from sink @Avi? Thanks.
>>
>> [1] https://s.apache.org/MaOl
>> [2] https://s.apache.org/hJDA
>>
>> Best Regards,
>> Yu
>>
>>
>> On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin 
>> wrote:
>>
>>> Hi Avi,
>>>
>>> It is on the road map but I am not aware about plans of any contributor
>>> to work on it for the next releases.
>>> I think the community will firstly work on the event time support for
>>> TTL.
>>> I will loop Yu in, maybe he has some plans to work on TTL for the
>>> queryable state.
>>>
>>> Best,
>>> Andrey
>>>
>>> On Wed, Jul 3, 2019 at 3:17 PM Avi Levi  wrote:
>>>
 Hi,
 Adding queryable state to state with ttl is not supported at 1.8.0
 (throwing java.lang.IllegalArgumentException: Queryable state is currently
 not supported with TTL)

 I saw in previous mailing thread
 that
 it is on the roadmap. Is it still on the roadmap ?

 * There is a workaround which is using timers to clear the state, but
 in our case, it means firing billons of timers on daily basis all at the
 same time, which seems no to very efficient and might cause some resources
 issues

 Cheers
 Avi





Re: Queryable state and TTL

2019-07-04 Thread Avi Levi
Hi Yu,
Our sink is actually Kafka hence we cannot query it properly, from there we
distribute it to different consumers. We keep info in our state such as
entry time, some accumulated data etc' , this data is not kept elsewhere
hence we need to query our state.

Best regards
Avi


On Thu, Jul 4, 2019 at 7:20 AM Yu Li  wrote:

> *This Message originated outside your organization.*
> --
> Thanks for the ping Andrey.
>
> For me the general answer is yes, but TBH it will probably not be added in
> the foreseeable future due to lack of committer bandwidth (not only
> QueryableState with TTL but all about QueryableState module) as per
> Aljoscha pointed out in another thread [1].
>
> Although we could see emerging requirements and proposals on
> QueryableState recently, prioritizing is important for each open source
> project. And personally I think it may help if we could gather more and
> clearly describe the other-than-debugging use cases of QueryableState in
> production [2]. Could you share your case with us and why QueryableState is
> necessary rather than querying the data from sink @Avi? Thanks.
>
> [1] https://s.apache.org/MaOl
> 
> [2] https://s.apache.org/hJDA
> 
>
> Best Regards,
> Yu
>
>
> On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin  wrote:
>
>> Hi Avi,
>>
>> It is on the road map but I am not aware about plans of any contributor
>> to work on it for the next releases.
>> I think the community will firstly work on the event time support for TTL.
>> I will loop Yu in, maybe he has some plans to work on TTL for the
>> queryable state.
>>
>> Best,
>> Andrey
>>
>> On Wed, Jul 3, 2019 at 3:17 PM Avi Levi  wrote:
>>
>>> Hi,
>>> Adding queryable state to state with ttl is not supported at 1.8.0
>>> (throwing java.lang.IllegalArgumentException: Queryable state is currently
>>> not supported with TTL)
>>>
>>> I saw in previous mailing thread
>>> that
>>> it is on the roadmap. Is it still on the roadmap ?
>>>
>>> * There is a workaround which is using timers to clear the state, but in
>>> our case, it means firing billons of timers on daily basis all at the same
>>> time, which seems no to very efficient and might cause some resources
>>> issues
>>>
>>> Cheers
>>> Avi
>>>
>>>
>>>


Re: Queryable state and TTL

2019-07-03 Thread Yu Li
Thanks for the ping Andrey.

For me the general answer is yes, but TBH it will probably not be added in
the foreseeable future due to lack of committer bandwidth (not only
QueryableState with TTL but all about QueryableState module) as per
Aljoscha pointed out in another thread [1].

Although we could see emerging requirements and proposals on QueryableState
recently, prioritizing is important for each open source project. And
personally I think it may help if we could gather more and clearly describe
the other-than-debugging use cases of QueryableState in production [2].
Could you share your case with us and why QueryableState is necessary
rather than querying the data from sink @Avi? Thanks.

[1] https://s.apache.org/MaOl
[2] https://s.apache.org/hJDA

Best Regards,
Yu


On Wed, 3 Jul 2019 at 23:13, Andrey Zagrebin  wrote:

> Hi Avi,
>
> It is on the road map but I am not aware about plans of any contributor to
> work on it for the next releases.
> I think the community will firstly work on the event time support for TTL.
> I will loop Yu in, maybe he has some plans to work on TTL for the
> queryable state.
>
> Best,
> Andrey
>
> On Wed, Jul 3, 2019 at 3:17 PM Avi Levi  wrote:
>
>> Hi,
>> Adding queryable state to state with ttl is not supported at 1.8.0
>> (throwing java.lang.IllegalArgumentException: Queryable state is currently
>> not supported with TTL)
>>
>> I saw in previous mailing thread
>> that
>> it is on the roadmap. Is it still on the roadmap ?
>>
>> * There is a workaround which is using timers to clear the state, but in
>> our case, it means firing billons of timers on daily basis all at the same
>> time, which seems no to very efficient and might cause some resources
>> issues
>>
>> Cheers
>> Avi
>>
>>
>>


Re: Queryable state and TTL

2019-07-03 Thread Andrey Zagrebin
Hi Avi,

It is on the road map but I am not aware about plans of any contributor to
work on it for the next releases.
I think the community will firstly work on the event time support for TTL.
I will loop Yu in, maybe he has some plans to work on TTL for the queryable
state.

Best,
Andrey

On Wed, Jul 3, 2019 at 3:17 PM Avi Levi  wrote:

> Hi,
> Adding queryable state to state with ttl is not supported at 1.8.0
> (throwing java.lang.IllegalArgumentException: Queryable state is currently
> not supported with TTL)
>
> I saw in previous mailing thread
> that
> it is on the roadmap. Is it still on the roadmap ?
>
> * There is a workaround which is using timers to clear the state, but in
> our case, it means firing billons of timers on daily basis all at the same
> time, which seems no to very efficient and might cause some resources
> issues
>
> Cheers
> Avi
>
>
>


Re: Queryable State race condition or serialization errors?

2019-05-27 Thread Tzu-Li (Gordon) Tai
Hi Burgess,

Would you be able to provide a minimal project that can reproduce your
error?
That would help a lot with figuring out the issue.
If you prefer to share that only privately, please feel free to send me a
private email with the details.
Another thing you can do is set logging level to "DEBUG". We have some
checks enabled at that level to see if serializers are being concurrently
used across threads.

Cheers,
Gordon

On Tue, May 21, 2019 at 9:59 PM burgesschen  wrote:

> Hi Gary.
>
> Thanks for the reply. I am using RocksDBStateBackend though.
>
> Best,
> Chen
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Queryable State race condition or serialization errors?

2019-05-21 Thread burgesschen
Hi Gary.

Thanks for the reply. I am using RocksDBStateBackend though. 

Best,
Chen



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Queryable State race condition or serialization errors?

2019-05-21 Thread Gary Yao
Hi Burgess Chen,

If you are using MemoryStateBackend or FsStateBackend, you can observe race
conditions on the state objects. However, the RocksDBStateBackend should be
safe from these issues [1].

Best,
Gary

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/queryable_state.html

On Tue, May 21, 2019 at 5:06 AM burgesschen  wrote:

> Hi Guys,
>
> I observed some strange behaviors while using Queryable state with Flink
> 1.6.2. Here is the story:
>
> My state is of type MapState[String, Map[String, String]]. the inner map is
> frequently updated. Upon querying, sometimes the returned inner map can
> miss
> some fields. What's more, sometimes the returned inner map has the values
> assigned to other keys!
>
> Changing the type to MapState[String, String] seem to solve the problem.
>
> The code is a little too deep to dig into. But my guess is that when the
> state is being updated and queried at the same time, there can be a race
> condition and cause data corruption. Please let me know if you have a
> better
> idea what could be happening. Much appreciated!
>
> Best,
> Burgess Chen
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Queryable state formal release plan

2019-05-20 Thread Aljoscha Krettek
Hi,

Currently no committers (or PMC members) are focusing on the queryable state 
feature. This will probably mean that not much is going to happen there in the 
near future. However, there is some discussion on the development by the larger 
community about QS: 
https://lists.apache.org/thread.html/ad82be36f36ab674764b71c1319121b524c0052b3b14203c18557c11@%3Cdev.flink.apache.org%3E
 


Best,
Aljoscha

> On 20. May 2019, at 08:08, Praveen Chandna  
> wrote:
> 
> Hi 
>  
> As the Queryable state is in the Beta state, Can you Please confirm the plan 
> for the formal release of the feature Queryable state.
> Is there any timeline by when this would be added to Flink.
>  
> Thanks !!!
> 
>  
> /// Regards
> Praveen Chandna
> Product Owner,
> Mobile +91 9873597204  |  ECN: 2864



Re: Queryable state support in Flink 1.9

2019-04-16 Thread Boris Lublinsky
Thanks thats it.

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Apr 16, 2019, at 8:31 AM, Guowei Ma  wrote:
> 
> AbstractQueryableStateTestBase



Re: Queryable state support in Flink 1.9

2019-04-16 Thread Guowei Ma
Hi,

1. I think Mini cluster supports queryable state.
2. You could set queryable-state.enable to true and try again.
You could check AbstractQueryableStateTestBase and there are some tests.
:)

Best,
Guowei


Boris Lublinsky  于2019年4月16日周二 下午9:09写道:

> Thanks Guowei
> The questions that I am asking is slightly different:
> 1. Does Mini cluster support queryable state?
> 2. If the answer is yes, how to set it up?
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Apr 15, 2019, at 12:07 AM, Guowei Ma  wrote:
>
> Hi,
> I think you should check TM log first and check if there are some info
> like:
> 1430 [main] INFO
> org.apache.flink.queryablestate.server.KvStateServerImpl  - Started
> Queryable State Server @ /127.0.0.1:9069.
> 1436 [main] INFO
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  -
> Started Queryable State Proxy Server @ /127.0.0. 
>
>
> Best,
> Guowei
>
>
> Boris Lublinsky  于2019年4月15日周一 上午4:02写道:
>
>> I was testing with Flink 1.9. Here is how I set up mini cluster
>>
>> int port = 6124;
>> int parallelism = 2;
>> Configuration config = new Configuration();
>> config.setInteger(JobManagerOptions.PORT, port);
>> config.setString(JobManagerOptions.ADDRESS, "localhost");
>> config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
>> // In a non MiniCluster setup queryable state is enabled by default.
>> config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
>> config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
>> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
>>
>> config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
>> config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
>> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
>>
>> MiniClusterConfiguration clusterconfig =
>> new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED,
>> null);
>> try {
>> // Create a local Flink server
>> MiniCluster flinkCluster = new MiniCluster(clusterconfig);
>> // Start server and create environment
>> flinkCluster.start();
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
>> env.setParallelism(parallelism);
>> // Build Graph
>> buildGraph(env);
>> JobGraph jobGraph = env.getStreamGraph().getJobGraph();
>> // Submit to the server and wait for completion
>> JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
>> System.out.println("Job ID : " + result.getJobID());
>> Thread.sleep(Long.MAX_VALUE);
>> } catch (Throwable t){
>> t.printStackTrace();
>> }
>>
>> And have a client, that looks like follows:
>>
>> def query(job: String, keys: Seq[String], host: String = "127.0.0.1",
>> port: Int = 9069,
>> timeInterval: Long = defaulttimeInterval): Unit = {
>>
>> // JobID, has to correspond to a running job
>> val jobId = JobID.fromHexString(job)
>> // Client
>> val client = new QueryableStateClient(host, port)
>>
>> But when I tried it, it gives an exception that nothing is listening on
>> port 9069
>>
>> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
>>
>> Am I missing something?
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>


Re: Queryable state support in Flink 1.9

2019-04-16 Thread Boris Lublinsky
Thanks Guowei
The questions that I am asking is slightly different:
1. Does Mini cluster support queryable state?
2. If the answer is yes, how to set it up? 

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Apr 15, 2019, at 12:07 AM, Guowei Ma  wrote:
> 
> Hi,
> I think you should check TM log first and check if there are some info like:
> 1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl  - 
> Started Queryable State Server @ /127.0.0.1:9069 .
> 1436 [main] INFO  
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  - 
> Started Queryable State Proxy Server @ /127.0.0. 
> 
> 
> Best,
> Guowei
> 
> 
> Boris Lublinsky  > 于2019年4月15日周一 上午4:02写道:
> I was testing with Flink 1.9. Here is how I set up mini cluster
> 
>   int port = 6124;
>   int parallelism = 2;
>   Configuration config = new Configuration();
>   config.setInteger(JobManagerOptions.PORT, port);
>   config.setString(JobManagerOptions.ADDRESS, "localhost");
>   config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
> parallelism);
>   // In a non MiniCluster setup queryable state is enabled by 
> default.
>   config.setString(QueryableStateOptions.PROXY_PORT_RANGE, 
> "9069");
>   config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
> 2);
>   
> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
> 
>   config.setString(QueryableStateOptions.SERVER_PORT_RANGE, 
> "9067");
>   config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
> 2);
>   
> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
> 
>   MiniClusterConfiguration clusterconfig =
>   new MiniClusterConfiguration(config, 1, 
> RpcServiceSharing.DEDICATED, null);
>   try {
>   // Create a local Flink server
>   MiniCluster flinkCluster = new 
> MiniCluster(clusterconfig);
>   // Start server and create environment
>   flinkCluster.start();
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
>   env.setParallelism(parallelism);
>   // Build Graph
>   buildGraph(env);
>   JobGraph jobGraph = env.getStreamGraph().getJobGraph();
>   // Submit to the server and wait for completion
>   JobSubmissionResult result = 
> flinkCluster.submitJob(jobGraph).get();
>   System.out.println("Job ID : " + result.getJobID());
>   Thread.sleep(Long.MAX_VALUE);
>   } catch (Throwable t){
>   t.printStackTrace();
>   }
> 
> And have a client, that looks like follows:
> 
> def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: 
> Int = 9069,
> timeInterval: Long = defaulttimeInterval): Unit = {
> 
> // JobID, has to correspond to a running job
> val jobId = JobID.fromHexString(job)
> // Client
> val client = new QueryableStateClient(host, port)
> 
> But when I tried it, it gives an exception that nothing is listening on port 
> 9069
> 
> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
> 
> Am I missing something?
> 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 



Re: Queryable state support in Flink 1.9

2019-04-14 Thread Guowei Ma
Hi,
I think you should check TM log first and check if there are some info like:
1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl
- Started Queryable State Server @ /127.0.0.1:9069.
1436 [main] INFO
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  -
Started Queryable State Proxy Server @ /127.0.0.


Best,
Guowei


Boris Lublinsky  于2019年4月15日周一 上午4:02写道:

> I was testing with Flink 1.9. Here is how I set up mini cluster
>
> int port = 6124;
> int parallelism = 2;
> Configuration config = new Configuration();
> config.setInteger(JobManagerOptions.PORT, port);
> config.setString(JobManagerOptions.ADDRESS, "localhost");
> config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
> // In a non MiniCluster setup queryable state is enabled by default.
> config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
> config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
>
> config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
> config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
>
> MiniClusterConfiguration clusterconfig =
> new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null);
> try {
> // Create a local Flink server
> MiniCluster flinkCluster = new MiniCluster(clusterconfig);
> // Start server and create environment
> flinkCluster.start();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
> env.setParallelism(parallelism);
> // Build Graph
> buildGraph(env);
> JobGraph jobGraph = env.getStreamGraph().getJobGraph();
> // Submit to the server and wait for completion
> JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
> System.out.println("Job ID : " + result.getJobID());
> Thread.sleep(Long.MAX_VALUE);
> } catch (Throwable t){
> t.printStackTrace();
> }
>
> And have a client, that looks like follows:
>
> def query(job: String, keys: Seq[String], host: String = "127.0.0.1",
> port: Int = 9069,
> timeInterval: Long = defaulttimeInterval): Unit = {
>
> // JobID, has to correspond to a running job
> val jobId = JobID.fromHexString(job)
> // Client
> val client = new QueryableStateClient(host, port)
>
> But when I tried it, it gives an exception that nothing is listening on
> port 9069
>
> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
>
> Am I missing something?
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-14 Thread Jayant Ameta
I tried to create a sample project, but couldn't reproduce the error! It
was working fine.
Turns out I was using wrong Tuple2 package in my client code :(
After fixing it, the code worked fine.

Thanks Till and Jiayi for your help!

Jayant Ameta


On Tue, Nov 13, 2018 at 4:01 PM Till Rohrmann  wrote:

> Hi Jayant,
>
> could you maybe setup a small Github project with the client and server
> code? Otherwise it is really hard to reproduce the problem. Thanks a lot!
>
> Cheers,
> Till
>
> On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta 
> wrote:
>
>> Getting the same error even when I added flink-avro dependency to the
>> client.
>>
>> Jayant Ameta
>>
>>
>> On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy  wrote:
>>
>>> Hi Jayant,
>>>
>>>I don’t know why flink uses the Avro serializer, which is usually
>>> used in POJO class, but from the error messages, I think you can add
>>> flink-avro as a dependency and try again.
>>>
>>>
>>> Best,
>>>
>>> Jiayi Liao
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* trohrmann; Tzu-Li (Gordon) Tai<
>>> tzuli...@apache.org>; user
>>> *Date:* Tuesday, Nov 13, 2018 16:15
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Thanks Jiayi,
>>> I updated the client code to use keyed stream key. The key is a
>>> Tuple2
>>>
>>> CompletableFuture> resultFuture =
>>> 
>>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>>> "rules",
>>> Tuple2.of(uuid, "test"), TypeInformation.of(new 
>>> TypeHint>() {
>>> }), descriptor);
>>>
>>> I'm now getting a different exception. I'm NOT using Avro as a customer 
>>> serializer. Not sure what causes this issue.
>>>
>>> Caused by: java.lang.RuntimeException: Error while processing request with 
>>> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find 
>>> required Avro dependency.
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>> at 
>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>> at 
>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>> at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>> at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>> at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolEx

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-13 Thread Till Rohrmann
Hi Jayant,

could you maybe setup a small Github project with the client and server
code? Otherwise it is really hard to reproduce the problem. Thanks a lot!

Cheers,
Till

On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta  wrote:

> Getting the same error even when I added flink-avro dependency to the
> client.
>
> Jayant Ameta
>
>
> On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy  wrote:
>
>> Hi Jayant,
>>
>>I don’t know why flink uses the Avro serializer, which is usually used
>> in POJO class, but from the error messages, I think you can add flink-avro
>> as a dependency and try again.
>>
>>
>> Best,
>>
>> Jiayi Liao
>>
>>  Original Message
>> *Sender:* Jayant Ameta
>> *Recipient:* bupt_ljy
>> *Cc:* trohrmann; Tzu-Li (Gordon) Tai<
>> tzuli...@apache.org>; user
>> *Date:* Tuesday, Nov 13, 2018 16:15
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Thanks Jiayi,
>> I updated the client code to use keyed stream key. The key is a
>> Tuple2
>>
>> CompletableFuture> resultFuture =
>> 
>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>> "rules",
>> Tuple2.of(uuid, "test"), TypeInformation.of(new 
>> TypeHint>() {
>> }), descriptor);
>>
>> I'm now getting a different exception. I'm NOT using Avro as a customer 
>> serializer. Not sure what causes this issue.
>>
>> Caused by: java.lang.RuntimeException: Error while processing request with 
>> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find 
>> required Avro dependency.
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>  at 
>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>  at 
>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>  at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>  at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>  at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>  at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>  at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>  at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>  at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>  at 
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>  at 
>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>>  at 
>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>>  at 
>> org.apache.flink.queryablestate.network.Abs

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-13 Thread Jayant Ameta
Getting the same error even when I added flink-avro dependency to the
client.

Jayant Ameta


On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy  wrote:

> Hi Jayant,
>
>I don’t know why flink uses the Avro serializer, which is usually used
> in POJO class, but from the error messages, I think you can add flink-avro
> as a dependency and try again.
>
>
> Best,
>
> Jiayi Liao
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* trohrmann; Tzu-Li (Gordon) Tai<
> tzuli...@apache.org>; user
> *Date:* Tuesday, Nov 13, 2018 16:15
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Thanks Jiayi,
> I updated the client code to use keyed stream key. The key is a
> Tuple2
>
> CompletableFuture> resultFuture =
> 
> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
> "rules",
> Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint String>>() {
> }), descriptor);
>
> I'm now getting a different exception. I'm NOT using Avro as a customer 
> serializer. Not sure what causes this issue.
>
> Caused by: java.lang.RuntimeException: Error while processing request with ID 
> 21. Caused by: java.lang.UnsupportedOperationException: Could not find 
> required Avro dependency.
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>   at 
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>   at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>   at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>   at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
>
>   at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>   at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>   at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
>
>   at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>   at 
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>   at 
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>   at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
>
>   at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncReques

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-13 Thread bupt_ljy
Hi Jayant,
 I don’t know why flink uses the Avro serializer, which is usually used in POJO 
class, but from the error messages, I think you can add flink-avro as a 
dependency and try again.


Best,
Jiayi Liao


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:trohrmanntrohrm...@apache.org; Tzu-Li (Gordon) taitzuli...@apache.org; 
useru...@flink.apache.org
Date:Tuesday, Nov 13, 2018 16:15
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Thanks Jiayi,
I updated the client code to use keyed stream key. The key is a Tuple2UUID, 
String
CompletableFutureMapStateUUID, Rule resultFuture =
 client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
"rules",
 Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHintTuple2UUID, String() {
 }), descriptor);
I'm now getting a different exception. I'm NOT using Avro as a customer 
serializer. Not sure what causes this issue.


Caused by: java.lang.RuntimeException: Error while processing request with ID 
21. Caused by: java.lang.UnsupportedOperationException: Could not find required 
Avro dependency.
at 
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-13 Thread Jayant Ameta
java:617)
... 1 more

Jayant Ameta


On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy  wrote:

> Hi, Jayant
>
>The key you specified in getKvState function should be the key of the
> keyed stream instead of the key of the map. From what I’ve seen on
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html,
> this feature only supports managed keyed state.
>
>By the way, I think we should optimize the error messages with which
> what Jayant met.
>
> Best,
> Jiayi Liao
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* trohrmann
> *Cc:* bupt_ljy; Tzu-Li (Gordon) Tai;
> user
> *Date:* Tuesday, Nov 13, 2018 13:39
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Till,
> Here is the client snippet. Here Rule is a custom POJO that I use.
>
> public static void main(String[] args)
> throws IOException, InterruptedException, ExecutionException {
>   UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");
>
>   QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
>   ExecutionConfig config = new ExecutionConfig();
>   client.setExecutionConfig(config);
>
>   MapStateDescriptor descriptor = new 
> MapStateDescriptor<>("rulePatterns", UUID.class,
>   Rule.class);
>   CompletableFuture> resultFuture =
>   
> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
> "rules",
>   uuid, TypeInformation.of(UUID.class), descriptor);
>
>   while (!resultFuture.isDone()) {
> Thread.sleep(1000);
>   }
>   resultFuture.whenComplete((result, throwable) -> {
> if (throwable != null) {
>   throwable.printStackTrace();
> } else {
>   try {
> System.out.println(result.get(uuid));
>   } catch (Exception e) {
> e.printStackTrace();
>   }
> }
>   });
> }
>
>
> Below is the stack trace:
>
> Caused by: java.lang.RuntimeException: Error while processing request with
> ID 12. Caused by: java.io.IOException: Unable to deserialize key and
> namespace. This indicates a mismatch in the key/namespace serializers used
> by the KvState instance and this access.
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
> at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException
> at
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
> ... 9 more
>
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
> at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
> at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thre

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-12 Thread bupt_ljy
Hi, Jayant
 The key you specified in getKvState function should be the key of the keyed 
stream instead of the key of the map. From what I’ve seen 
onhttps://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html,
 this feature only supports managed keyed state.
 By the way, I think we should optimize the error messages with which what 
Jayant met.


Best,
Jiayi Liao


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:trohrmanntrohrm...@apache.org
Cc:bupt_ljybupt_...@163.com; Tzu-Li (Gordon) taitzuli...@apache.org; 
useru...@flink.apache.org
Date:Tuesday, Nov 13, 2018 13:39
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.


public static void main(String[] args)
 throws IOException, InterruptedException, ExecutionException {
 UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

 QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
 ExecutionConfig config = new ExecutionConfig();
 client.setExecutionConfig(config);

 MapStateDescriptorUUID, Rule descriptor = new 
MapStateDescriptor("rulePatterns", UUID.class,
 Rule.class);
 CompletableFutureMapStateUUID, Rule resultFuture =
 client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
"rules",
 uuid, TypeInformation.of(UUID.class), descriptor);

 while (!resultFuture.isDone()) {
 Thread.sleep(1000);
 }
 resultFuture.whenComplete((result, throwable) - {
 if (throwable != null) {
 throwable.printStackTrace();
 } else {
 try {
 System.out.println(result.get(uuid));
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 });
}


Below is the stack trace:


Caused by: java.lang.RuntimeException: Error while processing request with ID 
12. Caused by: java.io.IOException: Unable to deserialize key and namespace. 
This indicates a mismatch in the key/namespace serializers used by the KvState 
instance and this access.
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more


at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-12 Thread Jayant Ameta
rent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at
org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)

Jayant Ameta


On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann  wrote:

> Could you send us a small example program which we can use to reproduce
> the problem?
>
> Cheers,
> Till
>
> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta  wrote:
>
>> Yeah, it IS using Kryo serializer.
>>
>> Jayant Ameta
>>
>>
>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Jayant, could you check that the UUID key on the TM is actually
>>> serialized using a Kryo serializer? You can do this by setting a breakpoint
>>> in the constructor of the `AbstractKeyedStateBackend`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:
>>>
>>>> Hi, Jayant
>>>>
>>>> Your code looks good to me. And I’ve tried the
>>>> serialize/deserialize of Kryo on UUID class, it all looks okay.
>>>>
>>>> I’m not very sure about this problem. Maybe you can write a very
>>>> simple demo to try if it works.
>>>>
>>>>
>>>> Jiayi Liao, Best
>>>>
>>>>  Original Message
>>>> *Sender:* Jayant Ameta
>>>> *Recipient:* bupt_ljy
>>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>>> user@flink.apache.org>
>>>> *Date:* Monday, Oct 29, 2018 11:53
>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>> Exception
>>>>
>>>> Hi Jiayi,
>>>> Any further help on this?
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta 
>>>> wrote:
>>>>
>>>>> MapStateDescriptor descriptor = new 
>>>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>>> String.class);
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>>>>
>>>>>> Hi,

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-09 Thread Till Rohrmann
Could you send us a small example program which we can use to reproduce the
problem?

Cheers,
Till

On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta  wrote:

> Yeah, it IS using Kryo serializer.
>
> Jayant Ameta
>
>
> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann  wrote:
>
>> Hi Jayant, could you check that the UUID key on the TM is actually
>> serialized using a Kryo serializer? You can do this by setting a breakpoint
>> in the constructor of the `AbstractKeyedStateBackend`.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:
>>
>>> Hi, Jayant
>>>
>>> Your code looks good to me. And I’ve tried the serialize/deserialize
>>> of Kryo on UUID class, it all looks okay.
>>>
>>> I’m not very sure about this problem. Maybe you can write a very
>>> simple demo to try if it works.
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>> user@flink.apache.org>
>>> *Date:* Monday, Oct 29, 2018 11:53
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jiayi,
>>> Any further help on this?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta 
>>> wrote:
>>>
>>>> MapStateDescriptor descriptor = new 
>>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>> String.class);
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>Can you show us the descriptor in the codes below?
>>>>>
>>>>> client.getKvState(JobID.fromHexString(
>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>
>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>>>>>
>>>>>>
>>>>> Jiayi Liao, Best
>>>>>
>>>>>
>>>>>  Original Message
>>>>> *Sender:* Jayant Ameta
>>>>> *Recipient:* bupt_ljy
>>>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>>>> user@flink.apache.org>
>>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>> Exception
>>>>>
>>>>> Also, I haven't provided any custom serializer in my flink job.
>>>>> Shouldn't the same configuration work for queryable state client?
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>>>>> wrote:
>>>>>
>>>>>> Hi Gordon,
>>>>>> Following is the stack trace that I'm getting:
>>>>>>
>>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>>>> java.lang.RuntimeException: Failed request 0.*
>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>> Encountered unregistered class ID: -985346241*
>>>>>> *Serialization trace:*
>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>>> * at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>>> * at
>>>>>> org.apache.flink.api.java.typeutils.runtime.T

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-08 Thread Jayant Ameta
Yeah, it IS using Kryo serializer.

Jayant Ameta


On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann  wrote:

> Hi Jayant, could you check that the UUID key on the TM is actually
> serialized using a Kryo serializer? You can do this by setting a breakpoint
> in the constructor of the `AbstractKeyedStateBackend`.
>
> Cheers,
> Till
>
> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:
>
>> Hi, Jayant
>>
>> Your code looks good to me. And I’ve tried the serialize/deserialize
>> of Kryo on UUID class, it all looks okay.
>>
>> I’m not very sure about this problem. Maybe you can write a very
>> simple demo to try if it works.
>>
>>
>> Jiayi Liao, Best
>>
>>  Original Message
>> *Sender:* Jayant Ameta
>> *Recipient:* bupt_ljy
>> *Cc:* Tzu-Li (Gordon) Tai; user<
>> user@flink.apache.org>
>> *Date:* Monday, Oct 29, 2018 11:53
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Hi Jiayi,
>> Any further help on this?
>>
>> Jayant Ameta
>>
>>
>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta 
>> wrote:
>>
>>> MapStateDescriptor descriptor = new 
>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>> String.class);
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>>
>>>> Hi,
>>>>
>>>>Can you show us the descriptor in the codes below?
>>>>
>>>> client.getKvState(JobID.fromHexString(
>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>
>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>>>>
>>>>>
>>>> Jiayi Liao, Best
>>>>
>>>>
>>>>  Original Message
>>>> *Sender:* Jayant Ameta
>>>> *Recipient:* bupt_ljy
>>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>>> user@flink.apache.org>
>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>> Exception
>>>>
>>>> Also, I haven't provided any custom serializer in my flink job.
>>>> Shouldn't the same configuration work for queryable state client?
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>>>> wrote:
>>>>
>>>>> Hi Gordon,
>>>>> Following is the stack trace that I'm getting:
>>>>>
>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>>> java.lang.RuntimeException: Failed request 0.*
>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException:
>>>>> Encountered unregistered class ID: -985346241*
>>>>> *Serialization trace:*
>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>> * at
>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>> * at
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>> * at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>> * at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>> * at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>>> * at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>>> * at
>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>>> * at
>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>>> * at
>>>>> org.apache.flink.queryablestat

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-07 Thread Till Rohrmann
Hi Jayant, could you check that the UUID key on the TM is actually
serialized using a Kryo serializer? You can do this by setting a breakpoint
in the constructor of the `AbstractKeyedStateBackend`.

Cheers,
Till

On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy  wrote:

> Hi, Jayant
>
> Your code looks good to me. And I’ve tried the serialize/deserialize
> of Kryo on UUID class, it all looks okay.
>
> I’m not very sure about this problem. Maybe you can write a very
> simple demo to try if it works.
>
>
> Jiayi Liao, Best
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* Tzu-Li (Gordon) Tai; user >
> *Date:* Monday, Oct 29, 2018 11:53
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Jiayi,
> Any further help on this?
>
> Jayant Ameta
>
>
> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta  wrote:
>
>> MapStateDescriptor descriptor = new 
>> MapStateDescriptor<>("rulePatterns", UUID.class,
>> String.class);
>>
>> Jayant Ameta
>>
>>
>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>Can you show us the descriptor in the codes below?
>>>
>>> client.getKvState(JobID.fromHexString(
>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>
>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>>>
>>>>
>>> Jiayi Liao, Best
>>>
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* bupt_ljy
>>> *Cc:* Tzu-Li (Gordon) Tai; user<
>>> user@flink.apache.org>
>>> *Date:* Friday, Oct 26, 2018 02:26
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Also, I haven't provided any custom serializer in my flink job.
>>> Shouldn't the same configuration work for queryable state client?
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>>> wrote:
>>>
>>>> Hi Gordon,
>>>> Following is the stack trace that I'm getting:
>>>>
>>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>>> java.lang.RuntimeException: Failed request 0.*
>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>> * Caused by: java.lang.RuntimeException: Error while processing request
>>>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>>>> unregistered class ID: -985346241*
>>>> *Serialization trace:*
>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>> * at
>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>> * at
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>> * at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>> * at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>> * at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>> * at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>> * at
>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>> * at
>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>> * at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>> * at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>> * at
>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>> * at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>> * at
>>

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-30 Thread bupt_ljy
Hi, Jayant
  Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo 
on UUID class, it all looks okay.
  I’m not very sure about this problem. Maybe you can write a very simple demo 
to try if it works.


Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Monday, Oct 29, 2018 11:53
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jiayi,
Any further help on this?


Jayant Ameta





On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta wittyam...@gmail.com wrote:

MapStateDescriptorUUID, String descriptor = new 
MapStateDescriptor("rulePatterns", UUID.class,
 String.class);
Jayant Ameta





On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy bupt_...@163.com wrote:

Hi,
 Can you show us the descriptor in the codes below?
  client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);


Jiayi Liao, Best




Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Friday, Oct 26, 2018 02:26
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Also, I haven't provided any custom serializer in my flink job. Shouldn't the 
same configuration work for queryable state client?


Jayant Ameta





On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta wittyam...@gmail.com wrote:

Hi Gordon,
Following is the stack trace that I'm getting:


Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Error while processing request with ID 
0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


I am not using any custom serialize as mentioned by Jiayi.


Jayant Ameta





On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy bupt_...@163.com wrote:

Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?





Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-28 Thread Jayant Ameta
Hi Jiayi,
Any further help on this?

Jayant Ameta


On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta  wrote:

> MapStateDescriptor descriptor = new 
> MapStateDescriptor<>("rulePatterns", UUID.class,
> String.class);
>
> Jayant Ameta
>
>
> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:
>
>> Hi,
>>
>>Can you show us the descriptor in the codes below?
>>
>> client.getKvState(JobID.fromHexString(
>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>
>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>>
>>>
>> Jiayi Liao, Best
>>
>>
>>  Original Message
>> *Sender:* Jayant Ameta
>> *Recipient:* bupt_ljy
>> *Cc:* Tzu-Li (Gordon) Tai; user<
>> user@flink.apache.org>
>> *Date:* Friday, Oct 26, 2018 02:26
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Also, I haven't provided any custom serializer in my flink job. Shouldn't
>> the same configuration work for queryable state client?
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta 
>> wrote:
>>
>>> Hi Gordon,
>>> Following is the stack trace that I'm getting:
>>>
>>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: Failed request 0.*
>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>> * Caused by: java.lang.RuntimeException: Error while processing request
>>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>>> unregistered class ID: -985346241*
>>> *Serialization trace:*
>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>> * at
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>> * at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>> * at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>> * at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>> * at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>> * at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>> * at
>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>> * at
>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>> * at
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>> * at
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>> * at
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>> * at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>> * at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>> * at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>> * at java.lang.Thread.run(Thread.java:748)*
>>>
>>> I am not using any custom serialize as mentioned by Jiayi.
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:
>>>
>>>> Hi  Jayant,
>>>>
>>>>   There should be a Serializer parameter in the constructor of the
>>>> StateDescriptor, you should create a new serializer like this:
>>>>
>>>>
>>>>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>>>
>>>>
>>>>  By the way, can you show us your kryo exception like what Gordon said?
>>>>
>>>>
>>>> Jiayi Liao, Best
>>>>
>>>>
>>>>
>>>>  Original Message
>>>> *Sender:* Tzu-Li (Gordon) Tai
>>>

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
MapStateDescriptor descriptor = new
MapStateDescriptor<>("rulePatterns", UUID.class,
String.class);

Jayant Ameta


On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy  wrote:

> Hi,
>
>Can you show us the descriptor in the codes below?
>
> client.getKvState(JobID.fromHexString(
> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>
> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>> TypeInformation.of(new TypeHint() {}), descriptor);
>>
>>
> Jiayi Liao, Best
>
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* bupt_ljy
> *Cc:* Tzu-Li (Gordon) Tai; user >
> *Date:* Friday, Oct 26, 2018 02:26
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Also, I haven't provided any custom serializer in my flink job. Shouldn't
> the same configuration work for queryable state client?
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta  wrote:
>
>> Hi Gordon,
>> Following is the stack trace that I'm getting:
>>
>> *Exception in thread "main" java.util.concurrent.ExecutionException:
>> java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>> * Caused by: java.lang.RuntimeException: Error while processing request
>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
>> unregistered class ID: -985346241*
>> *Serialization trace:*
>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>> * at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>> * at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>> * at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>> * at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>> * at
>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>> * at
>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>> * at
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>> * at
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>> * at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>> * at java.lang.Thread.run(Thread.java:748)*
>>
>> I am not using any custom serialize as mentioned by Jiayi.
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:
>>
>>> Hi  Jayant,
>>>
>>>   There should be a Serializer parameter in the constructor of the
>>> StateDescriptor, you should create a new serializer like this:
>>>
>>>
>>>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>>
>>>
>>>  By the way, can you show us your kryo exception like what Gordon said?
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>
>>>
>>>  Original Message
>>> *Sender:* Tzu-Li (Gordon) Tai
>>> *Recipient:* Jayant Ameta; bupt_ljy<
>>> bupt_...@163.com>
>>> *Cc:* user
>>> *Date:* Thursday, Oct 25, 2018 17:18
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Jayant,
>>>
>>> What is the Kryo exception message that you are getting?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
>>> wrote:
>>>
>>> Hi,
>>>

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi,
 Can you show us the descriptor in the codes below?
  client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);


Jiayi Liao, Best




Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Friday, Oct 26, 2018 02:26
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Also, I haven't provided any custom serializer in my flink job. Shouldn't the 
same configuration work for queryable state client?


Jayant Ameta





On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta wittyam...@gmail.com wrote:

Hi Gordon,
Following is the stack trace that I'm getting:


Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Error while processing request with ID 
0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


I am not using any custom serialize as mentioned by Jiayi.


Jayant Ameta





On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy bupt_...@163.com wrote:

Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
Also, I haven't provided any custom serializer in my flink job. Shouldn't
the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta  wrote:

> Hi Gordon,
> Following is the stack trace that I'm getting:
>
> *Exception in thread "main" java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Failed request 0.*
> * Caused by: java.lang.RuntimeException: Failed request 0.*
> * Caused by: java.lang.RuntimeException: Error while processing request
> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
> unregistered class ID: -985346241*
> *Serialization trace:*
> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
> * at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
> * at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
> * at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
> * at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
> * at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
> * at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
> * at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
> * at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
> * at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
> * at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
> * at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
> * at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> * at java.lang.Thread.run(Thread.java:748)*
>
> I am not using any custom serialize as mentioned by Jiayi.
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:
>
>> Hi  Jayant,
>>
>>   There should be a Serializer parameter in the constructor of the
>> StateDescriptor, you should create a new serializer like this:
>>
>>
>>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>
>>
>>  By the way, can you show us your kryo exception like what Gordon said?
>>
>>
>> Jiayi Liao, Best
>>
>>
>>
>>  Original Message
>> *Sender:* Tzu-Li (Gordon) Tai
>> *Recipient:* Jayant Ameta; bupt_ljy<
>> bupt_...@163.com>
>> *Cc:* user
>> *Date:* Thursday, Oct 25, 2018 17:18
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Hi Jayant,
>>
>> What is the Kryo exception message that you are getting?
>>
>> Cheers,
>> Gordon
>>
>>
>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
>> wrote:
>>
>> Hi,
>> I've not configured any serializer in the descriptor. (Neither in flink
>> job, nor in state query client).
>> Which serializer should I use?
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
>>
>>> Hi,
>>>
>>>It seems that your codes are right. Are you sure that you’re using
>>> the same Serializer as the Flink program do? Could you show the serializer
>>> in descriptor?
>>>
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta
>>> *Recipient:* user
>>> *Date:* Thursday, Oct 25, 2018 14:17
>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>>
>>> I get Kyro exception when querying the state.
>>>
>>> Key: UUID
>>> MapState
>>>
>>> Client code snippet:
>>>
>>> CompletableFuture> resultFuture =
>>> 
>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
>>> "rule",
>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>> TypeInformation.of(new TypeHint() {}), descriptor);
>>> MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>
>>>
>>> Any better way to query it?
>>>
>>>
>>> Jayant Ameta
>>>
>>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
Hi Gordon,
Following is the stack trace that I'm getting:

*Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Failed request 0.*
* Caused by: java.lang.RuntimeException: Failed request 0.*
* Caused by: java.lang.RuntimeException: Error while processing request
with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
unregistered class ID: -985346241*
*Serialization trace:*
*$outer (scala.collection.convert.Wrappers$SeqWrapper)*
* at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
* at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
* at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
* at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
* at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
* at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
* at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
* at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
* at
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
* at
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
* at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
* at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
* at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
* at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
* at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
* at java.lang.Thread.run(Thread.java:748)*

I am not using any custom serialize as mentioned by Jiayi.

Jayant Ameta


On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy  wrote:

> Hi  Jayant,
>
>   There should be a Serializer parameter in the constructor of the
> StateDescriptor, you should create a new serializer like this:
>
>
>new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>
>
>  By the way, can you show us your kryo exception like what Gordon said?
>
>
> Jiayi Liao, Best
>
>
>
>  Original Message
> *Sender:* Tzu-Li (Gordon) Tai
> *Recipient:* Jayant Ameta; bupt_ljy >
> *Cc:* user
> *Date:* Thursday, Oct 25, 2018 17:18
> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>
> Hi Jayant,
>
> What is the Kryo exception message that you are getting?
>
> Cheers,
> Gordon
>
>
> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
> wrote:
>
> Hi,
> I've not configured any serializer in the descriptor. (Neither in flink
> job, nor in state query client).
> Which serializer should I use?
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
>
>> Hi,
>>
>>It seems that your codes are right. Are you sure that you’re using the
>> same Serializer as the Flink program do? Could you show the serializer in
>> descriptor?
>>
>>
>>
>> Jiayi Liao, Best
>>
>>  Original Message
>> *Sender:* Jayant Ameta
>> *Recipient:* user
>> *Date:* Thursday, Oct 25, 2018 14:17
>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>
>> I get Kyro exception when querying the state.
>>
>> Key: UUID
>> MapState
>>
>> Client code snippet:
>>
>> CompletableFuture> resultFuture =
>> 
>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
>> "rule",
>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>> TypeInformation.of(new TypeHint() {}), descriptor);
>> MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>
>>
>> Any better way to query it?
>>
>>
>> Jayant Ameta
>>
>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Tzu-Li (Gordon) Tai
Hi Jayant,

What is the Kryo exception message that you are getting?

Cheers,
Gordon


On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:

Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:
Hi,
   It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in 
descriptor? 


Jiayi Liao, Best

 Original Message 
Sender: Jayant Ameta
Recipient: user
Date: Thursday, Oct 25, 2018 14:17
Subject: Queryable state when key is UUID - getting Kyro Exception

I get Kyro exception when querying the state.
 
Key: UUID
MapState

Client code snippet:

CompletableFuture> resultFuture =
client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
TypeInformation.of(new TypeHint() {}), descriptor);
MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);

Any better way to query it?


Jayant Ameta


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread Jayant Ameta
Hi,
I've not configured any serializer in the descriptor. (Neither in flink
job, nor in state query client).
Which serializer should I use?

Jayant Ameta


On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy  wrote:

> Hi,
>
>It seems that your codes are right. Are you sure that you’re using the
> same Serializer as the Flink program do? Could you show the serializer in
> descriptor?
>
>
>
> Jiayi Liao, Best
>
>  Original Message
> *Sender:* Jayant Ameta
> *Recipient:* user
> *Date:* Thursday, Oct 25, 2018 14:17
> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>
> I get Kyro exception when querying the state.
>
> Key: UUID
> MapState
>
> Client code snippet:
>
> CompletableFuture> resultFuture =
> 
> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
> "rule",
> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
> TypeInformation.of(new TypeHint() {}), descriptor);
> MapState mapState = resultFuture.get(10, TimeUnit.SECONDS);
>
>
> Any better way to query it?
>
>
> Jayant Ameta
>


Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Re: Queryable State

2018-09-05 Thread Sameer Wadkar
I have used connected streams where one part of the connected stream maintains 
state and the other part consumes it. 

However it was not queryable externally. For state that is queryable externally 
you are right you probably need another operator to store state and support 
query-ability. 

Sent from my iPhone

> On Sep 5, 2018, at 7:26 AM, Darshan Singh  wrote:
> 
> Hi All,
> 
> I was playing with queryable state. As queryable stream can not be modified 
> how do I use the output of say my reduce function for further processing.
> 
> Below is 1 example. I can see that I am processing the data twice. One for 
> the Queryable stream and once for the my original stream. That means state 
> will be kept twice as well?
> 
> In simple term I would like to query the state from rf reduce function and I 
> would like my stream to be written to Kafka. If I use like below I am able to 
> do so but it seems to me that there is duplicate state storing as well as 
> duplicate work is being done.
> 
> Is there any alternate for what I am trying to achieve this?
> 
> Thanks
> 
> public class reducetest {
> 
> public static void main(String[] args) throws Exception {
> 
> 
> // set up streaming execution environment
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
> List nums = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
> 
> ReducingStateDescriptor> descriptor =
> new ReducingStateDescriptor>(
> "sum", // the state name
> new rf(),
> TypeInformation.of(new TypeHint Integer>>() {})); // type information
> descriptor.setQueryable("reduce");
> 
> DataStream> ds1 = 
> env.fromCollection(nums).map(new MapFunction Integer>>() {
> @Override
> public Tuple2 map(Integer integer) throws 
> Exception {
> return Tuple2.of(integer%2,integer);
> }
> }).keyBy(0)
> ;
> 
> ((KeyedStream) ds1).asQueryableState("reduce", descriptor);
> 
> 
> DataStream ds2 = ((KeyedStream) ds1).reduce( 
> descriptor.getReduceFunction());
> 
> //ds1.print();
> ds2.print();
> 
> System.out.println(env.getExecutionPlan());
> 
> env.execute("re");
> }
> 
> 
> 
> static class  rf implements ReduceFunction> {
> 
> @Override
> public Tuple2 reduce(Tuple2 e, 
> Tuple2 n) throws Exception {
> return Tuple2.of(e.f0, e.f1 + n.f1);
> 
> }
> }
> 
> }


Re: Queryable state and state TTL

2018-08-29 Thread Andrey Zagrebin
Hi,

Fabian is right, support of TTL for queryable state needs an extra effort 
because of some specifics of its interaction with state objects, but there is 
no fundamental problem. It is on the roadmap for the future realises.

Best,
Andrey

> On 29 Aug 2018, at 09:30, Fabian Hueske  wrote:
> 
> Hi,
> 
> I guess that this is not a fundamental problem but just a limitation in the 
> current implementation.
> Andrey (in CC) who implemented the TTL support should be able to give more 
> insight on this issue.
> 
> Best, Fabian
> 
> Am Mi., 29. Aug. 2018 um 04:06 Uhr schrieb vino yang  >:
> Hi Elias,
> 
> From the source code, the reason for throwing this exception is because 
> StateTtlConfig is set to StateTtlConfig.DISABLED. 
> Please refer to the usage and description of the official Flink documentation 
> for details.[1]
> 
> And there is a note you should pay attention : Only TTLs in reference to 
> processing time are currently supported.
> 
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>  
> 
> 
> Thanks, vino.
> 
> Elias Levy mailto:fearsome.lucid...@gmail.com>> 
> 于2018年8月29日周三 上午6:03写道:
> Is there a reason queryable state can't work with state TTL?  Trying to use 
> both at the same time leads to a "IllegalArgumentException: Queryable state 
> is currently not supported with TTL"
> 
> 



Re: Queryable state and state TTL

2018-08-29 Thread Fabian Hueske
Hi,

I guess that this is not a fundamental problem but just a limitation in the
current implementation.
Andrey (in CC) who implemented the TTL support should be able to give more
insight on this issue.

Best, Fabian

Am Mi., 29. Aug. 2018 um 04:06 Uhr schrieb vino yang :

> Hi Elias,
>
> From the source code, the reason for throwing this exception is because
> StateTtlConfig is set to StateTtlConfig.DISABLED.
> Please refer to the usage and description of the official Flink
> documentation for details.[1]
>
> And there is a note you should pay attention : Only TTLs in reference to 
> *processing
> time* are currently supported.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl
>
> Thanks, vino.
>
> Elias Levy  于2018年8月29日周三 上午6:03写道:
>
>> Is there a reason queryable state can't work with state TTL?  Trying to
>> use both at the same time leads to a "IllegalArgumentException: Queryable
>> state is currently not supported with TTL"
>>
>>
>>


Re: Queryable state and state TTL

2018-08-28 Thread vino yang
Hi Elias,

>From the source code, the reason for throwing this exception is because
StateTtlConfig is set to StateTtlConfig.DISABLED.
Please refer to the usage and description of the official Flink
documentation for details.[1]

And there is a note you should pay attention : Only TTLs in reference
to *processing
time* are currently supported.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl

Thanks, vino.

Elias Levy  于2018年8月29日周三 上午6:03写道:

> Is there a reason queryable state can't work with state TTL?  Trying to
> use both at the same time leads to a "IllegalArgumentException: Queryable
> state is currently not supported with TTL"
>
>
>


Re: Queryable State

2018-03-27 Thread Vishal Santoshi
Thank you for the clarification.

On Wed, Mar 21, 2018, 4:28 AM Kostas Kloudas 
wrote:

> Hi Vishal,
>
> As Fabian said, queryable state is just a feature that exposes the state
> kept within Flink, and it is not made to
> replace functionality that would otherwise be made by a sink. In the
> future the functionality will definitely evolve
> but for there are no discussions currently, for keeping the state of a job
> even after the job is done.
>
> For being able to do so, with exactly once semantics and all the
> guarantees provided by Flink, I would recommend
> to use an external sink.
>
> Cheers,
> Kostas
>
> On Mar 19, 2018, at 6:18 PM, Vishal Santoshi 
> wrote:
>
> Thank you. These do look like show stoppers for us.  But again thank you.
>
> On Mon, Mar 19, 2018 at 10:31 AM, Fabian Hueske  wrote:
>
>> AFAIK, there have been discussions to replicate state among TMs to speed
>> up recovery (and improve availability).
>> However, I'm not aware of plans to implement that.
>>
>> I don't think serving state while a job is down has been considered yet.
>>
>> 2018-03-19 15:17 GMT+01:00 Vishal Santoshi :
>>
>>> Are there plans to address all or few of the above apart from the  "JM
>>> LB not possible" which seems understandable ?
>>>
>>> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske 
>>> wrote:
>>>
 Queryable state is "just" an additional feature on regular keyed state.
 i.e., the only difference is that you can read the state from an outside
 application.
 Besides that it behaves exactly like regular application state

 Queryable state is (at the moment) designed to be accessible if a job
 runs.
 If the job fails (and recovers) or is manually taken down for
 maintenance, the state cannot be queried.
 It's not possible to put a load balancer in front of a JobManager. Only
 one JM is the active master that maintains a running job.
 State is also not replicated.

 Best, Fabian


 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :

> Those are understandable. I am more interested in a few things ( and
> may be more that could be added )
>
> * As far as I can understand JM is the SPOF. Does HA become a
> necessity ?
> * If there are 2 or more JM could we theoretically have a LB fronting
> them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
> setup for JM HA specifically for Queryable access ( For  flink jobs it is
> master slave )
> * Do we replicate state to other TMs for read optimization (
> specifically to avoid Hot Node issues ) ?
> * If the job goes down it seems the state is not accessible. What
> plans to we have to "separate concerns" for Queryable state.
>
> We consider Queryable State significant a feature Flink provides and
> would do the necessary leg work if there are certain gaps in it being
> trully considered a Highly Available Key Value store.
>
> Regards.
>
>
>
>
>
> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske 
> wrote:
>
>> Hi Vishal,
>>
>> In general, Queryable State should be ready to use.
>> There are a few things to consider though:
>>
>> - State queries are not synchronized with the application code, i.e.,
>> they can happen at the same time. Therefore, the Flink application should
>> not modify objects that have been put into or read from the state if you
>> are not using the RocksDBStatebackend (which creates copies by
>> deserialization).
>> - State will be rolled back after a failure. Hence, you can read
>> writes that are not "committed by a checkpoint".
>>
>> @Kostas, did I forget something?
>>
>> Best, Fabian
>>
>>
>>
>> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi > >:
>>
>>> To be more precise, is anything thing similar to
>>> https://engineering.linkedin.com/blog/2018/03/air-traffic-controller--member-first-notifications-at-linkedin
>>> . done in Samza, can be replicated with production level guarantees with
>>> Flink Queryable state ( as it stands currently version 1.5 )  ?
>>>
>>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 We are making few decisions on use cases where  Queryable state is
 a natural fit
 https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html

 Is Queryable state production ready ? We will go to 1.5 flnk if
 that helps to make the case for the usage.

>>>
>>>
>>
>

>>>
>>
>
>


Re: Queryable State

2018-03-21 Thread Kostas Kloudas
Hi Vishal,

As Fabian said, queryable state is just a feature that exposes the state kept 
within Flink, and it is not made to 
replace functionality that would otherwise be made by a sink. In the future the 
functionality will definitely evolve
but for there are no discussions currently, for keeping the state of a job even 
after the job is done.

For being able to do so, with exactly once semantics and all the guarantees 
provided by Flink, I would recommend
to use an external sink.

Cheers,
Kostas

> On Mar 19, 2018, at 6:18 PM, Vishal Santoshi  
> wrote:
> 
> Thank you. These do look like show stoppers for us.  But again thank you.
> 
> On Mon, Mar 19, 2018 at 10:31 AM, Fabian Hueske  > wrote:
> AFAIK, there have been discussions to replicate state among TMs to speed up 
> recovery (and improve availability).
> However, I'm not aware of plans to implement that.
> 
> I don't think serving state while a job is down has been considered yet.
> 
> 2018-03-19 15:17 GMT+01:00 Vishal Santoshi  >:
> Are there plans to address all or few of the above apart from the  "JM LB not 
> possible" which seems understandable ? 
> 
> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  > wrote:
> Queryable state is "just" an additional feature on regular keyed state. i.e., 
> the only difference is that you can read the state from an outside 
> application.
> Besides that it behaves exactly like regular application state
> 
> Queryable state is (at the moment) designed to be accessible if a job runs.
> If the job fails (and recovers) or is manually taken down for maintenance, 
> the state cannot be queried.
> It's not possible to put a load balancer in front of a JobManager. Only one 
> JM is the active master that maintains a running job.
> State is also not replicated. 
> 
> Best, Fabian
> 
> 
> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi  >:
> Those are understandable. I am more interested in a few things ( and may be 
> more that could be added ) 
> 
> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
> * If there are 2 or more JM could we theoretically have a LB fronting them ? 
> Thus it is a peer to peer access ( Cassandra ) or a master slave setup for JM 
> HA specifically for Queryable access ( For  flink jobs it is master slave ) 
> * Do we replicate state to other TMs for read optimization ( specifically to 
> avoid Hot Node issues ) ?
> * If the job goes down it seems the state is not accessible. What plans to we 
> have to "separate concerns" for Queryable state.
> 
> We consider Queryable State significant a feature Flink provides and would do 
> the necessary leg work if there are certain gaps in it being trully 
> considered a Highly Available Key Value store.
> 
> Regards.
> 
> 
> 
>  
> 
> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  > wrote:
> Hi Vishal,
> 
> In general, Queryable State should be ready to use. 
> There are a few things to consider though:
> 
> - State queries are not synchronized with the application code, i.e., they 
> can happen at the same time. Therefore, the Flink application should not 
> modify objects that have been put into or read from the state if you are not 
> using the RocksDBStatebackend (which creates copies by deserialization).
> - State will be rolled back after a failure. Hence, you can read writes that 
> are not "committed by a checkpoint". 
> 
> @Kostas, did I forget something?
> 
> Best, Fabian
> 
> 
> 
> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi  >:
> To be more precise, is anything thing similar to 
> https://engineering.linkedin.com/blog/2018/03/air-traffic-controller--member-first-notifications-at-linkedin
>  
> 
>  . done in Samza, can be replicated with production level guarantees with 
> Flink Queryable state ( as it stands currently version 1.5 )  ? 
> 
> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi  > wrote:
> We are making few decisions on use cases where  Queryable state is a natural 
> fit 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>  
> 
> 
> Is Queryable state production ready ? We will go to 1.5 flnk if that helps to 
> make the case for the usage.
> 
> 
> 
> 
> 
> 
> 



Re: Queryable State

2018-03-19 Thread Vishal Santoshi
Thank you. These do look like show stoppers for us.  But again thank you.

On Mon, Mar 19, 2018 at 10:31 AM, Fabian Hueske  wrote:

> AFAIK, there have been discussions to replicate state among TMs to speed
> up recovery (and improve availability).
> However, I'm not aware of plans to implement that.
>
> I don't think serving state while a job is down has been considered yet.
>
> 2018-03-19 15:17 GMT+01:00 Vishal Santoshi :
>
>> Are there plans to address all or few of the above apart from the  "JM LB
>> not possible" which seems understandable ?
>>
>> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  wrote:
>>
>>> Queryable state is "just" an additional feature on regular keyed state.
>>> i.e., the only difference is that you can read the state from an outside
>>> application.
>>> Besides that it behaves exactly like regular application state
>>>
>>> Queryable state is (at the moment) designed to be accessible if a job
>>> runs.
>>> If the job fails (and recovers) or is manually taken down for
>>> maintenance, the state cannot be queried.
>>> It's not possible to put a load balancer in front of a JobManager. Only
>>> one JM is the active master that maintains a running job.
>>> State is also not replicated.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :
>>>
 Those are understandable. I am more interested in a few things ( and
 may be more that could be added )

 * As far as I can understand JM is the SPOF. Does HA become a necessity
 ?
 * If there are 2 or more JM could we theoretically have a LB fronting
 them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
 setup for JM HA specifically for Queryable access ( For  flink jobs it is
 master slave )
 * Do we replicate state to other TMs for read optimization (
 specifically to avoid Hot Node issues ) ?
 * If the job goes down it seems the state is not accessible. What plans
 to we have to "separate concerns" for Queryable state.

 We consider Queryable State significant a feature Flink provides and
 would do the necessary leg work if there are certain gaps in it being
 trully considered a Highly Available Key Value store.

 Regards.





 On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske 
 wrote:

> Hi Vishal,
>
> In general, Queryable State should be ready to use.
> There are a few things to consider though:
>
> - State queries are not synchronized with the application code, i.e.,
> they can happen at the same time. Therefore, the Flink application should
> not modify objects that have been put into or read from the state if you
> are not using the RocksDBStatebackend (which creates copies by
> deserialization).
> - State will be rolled back after a failure. Hence, you can read
> writes that are not "committed by a checkpoint".
>
> @Kostas, did I forget something?
>
> Best, Fabian
>
>
>
> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi 
> :
>
>> To be more precise, is anything thing similar to
>> https://engineering.linkedin.com/blog/2018/03/air-traffic
>> -controller--member-first-notifications-at-linkedin . done in Samza,
>> can be replicated with production level guarantees with Flink Queryable
>> state ( as it stands currently version 1.5 )  ?
>>
>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We are making few decisions on use cases where  Queryable state is a
>>> natural fit https://ci.apache.org/projects
>>> /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>>>
>>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>>> helps to make the case for the usage.
>>>
>>
>>
>

>>>
>>
>


Re: Queryable State

2018-03-19 Thread Fabian Hueske
AFAIK, there have been discussions to replicate state among TMs to speed up
recovery (and improve availability).
However, I'm not aware of plans to implement that.

I don't think serving state while a job is down has been considered yet.

2018-03-19 15:17 GMT+01:00 Vishal Santoshi :

> Are there plans to address all or few of the above apart from the  "JM LB
> not possible" which seems understandable ?
>
> On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  wrote:
>
>> Queryable state is "just" an additional feature on regular keyed state.
>> i.e., the only difference is that you can read the state from an outside
>> application.
>> Besides that it behaves exactly like regular application state
>>
>> Queryable state is (at the moment) designed to be accessible if a job
>> runs.
>> If the job fails (and recovers) or is manually taken down for
>> maintenance, the state cannot be queried.
>> It's not possible to put a load balancer in front of a JobManager. Only
>> one JM is the active master that maintains a running job.
>> State is also not replicated.
>>
>> Best, Fabian
>>
>>
>> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :
>>
>>> Those are understandable. I am more interested in a few things ( and may
>>> be more that could be added )
>>>
>>> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
>>> * If there are 2 or more JM could we theoretically have a LB fronting
>>> them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
>>> setup for JM HA specifically for Queryable access ( For  flink jobs it is
>>> master slave )
>>> * Do we replicate state to other TMs for read optimization (
>>> specifically to avoid Hot Node issues ) ?
>>> * If the job goes down it seems the state is not accessible. What plans
>>> to we have to "separate concerns" for Queryable state.
>>>
>>> We consider Queryable State significant a feature Flink provides and
>>> would do the necessary leg work if there are certain gaps in it being
>>> trully considered a Highly Available Key Value store.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Vishal,

 In general, Queryable State should be ready to use.
 There are a few things to consider though:

 - State queries are not synchronized with the application code, i.e.,
 they can happen at the same time. Therefore, the Flink application should
 not modify objects that have been put into or read from the state if you
 are not using the RocksDBStatebackend (which creates copies by
 deserialization).
 - State will be rolled back after a failure. Hence, you can read writes
 that are not "committed by a checkpoint".

 @Kostas, did I forget something?

 Best, Fabian



 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :

> To be more precise, is anything thing similar to
> https://engineering.linkedin.com/blog/2018/03/air-traffic
> -controller--member-first-notifications-at-linkedin . done in Samza,
> can be replicated with production level guarantees with Flink Queryable
> state ( as it stands currently version 1.5 )  ?
>
> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We are making few decisions on use cases where  Queryable state is a
>> natural fit https://ci.apache.org/projects
>> /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>>
>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>> helps to make the case for the usage.
>>
>
>

>>>
>>
>


Re: Queryable State

2018-03-19 Thread Vishal Santoshi
Are there plans to address all or few of the above apart from the  "JM LB
not possible" which seems understandable ?

On Mon, Mar 19, 2018 at 9:58 AM, Fabian Hueske  wrote:

> Queryable state is "just" an additional feature on regular keyed state.
> i.e., the only difference is that you can read the state from an outside
> application.
> Besides that it behaves exactly like regular application state
>
> Queryable state is (at the moment) designed to be accessible if a job runs.
> If the job fails (and recovers) or is manually taken down for maintenance,
> the state cannot be queried.
> It's not possible to put a load balancer in front of a JobManager. Only
> one JM is the active master that maintains a running job.
> State is also not replicated.
>
> Best, Fabian
>
>
> 2018-03-19 14:24 GMT+01:00 Vishal Santoshi :
>
>> Those are understandable. I am more interested in a few things ( and may
>> be more that could be added )
>>
>> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
>> * If there are 2 or more JM could we theoretically have a LB fronting
>> them ? Thus it is a peer to peer access ( Cassandra ) or a master slave
>> setup for JM HA specifically for Queryable access ( For  flink jobs it is
>> master slave )
>> * Do we replicate state to other TMs for read optimization ( specifically
>> to avoid Hot Node issues ) ?
>> * If the job goes down it seems the state is not accessible. What plans
>> to we have to "separate concerns" for Queryable state.
>>
>> We consider Queryable State significant a feature Flink provides and
>> would do the necessary leg work if there are certain gaps in it being
>> trully considered a Highly Available Key Value store.
>>
>> Regards.
>>
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  wrote:
>>
>>> Hi Vishal,
>>>
>>> In general, Queryable State should be ready to use.
>>> There are a few things to consider though:
>>>
>>> - State queries are not synchronized with the application code, i.e.,
>>> they can happen at the same time. Therefore, the Flink application should
>>> not modify objects that have been put into or read from the state if you
>>> are not using the RocksDBStatebackend (which creates copies by
>>> deserialization).
>>> - State will be rolled back after a failure. Hence, you can read writes
>>> that are not "committed by a checkpoint".
>>>
>>> @Kostas, did I forget something?
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :
>>>
 To be more precise, is anything thing similar to
 https://engineering.linkedin.com/blog/2018/03/air-traffic
 -controller--member-first-notifications-at-linkedin . done in Samza,
 can be replicated with production level guarantees with Flink Queryable
 state ( as it stands currently version 1.5 )  ?

 On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> We are making few decisions on use cases where  Queryable state is a
> natural fit https://ci.apache.org/projects
> /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
>
> Is Queryable state production ready ? We will go to 1.5 flnk if that
> helps to make the case for the usage.
>


>>>
>>
>


Re: Queryable State

2018-03-19 Thread Fabian Hueske
Queryable state is "just" an additional feature on regular keyed state.
i.e., the only difference is that you can read the state from an outside
application.
Besides that it behaves exactly like regular application state

Queryable state is (at the moment) designed to be accessible if a job runs.
If the job fails (and recovers) or is manually taken down for maintenance,
the state cannot be queried.
It's not possible to put a load balancer in front of a JobManager. Only one
JM is the active master that maintains a running job.
State is also not replicated.

Best, Fabian


2018-03-19 14:24 GMT+01:00 Vishal Santoshi :

> Those are understandable. I am more interested in a few things ( and may
> be more that could be added )
>
> * As far as I can understand JM is the SPOF. Does HA become a necessity ?
> * If there are 2 or more JM could we theoretically have a LB fronting them
> ? Thus it is a peer to peer access ( Cassandra ) or a master slave setup
> for JM HA specifically for Queryable access ( For  flink jobs it is master
> slave )
> * Do we replicate state to other TMs for read optimization ( specifically
> to avoid Hot Node issues ) ?
> * If the job goes down it seems the state is not accessible. What plans to
> we have to "separate concerns" for Queryable state.
>
> We consider Queryable State significant a feature Flink provides and would
> do the necessary leg work if there are certain gaps in it being trully
> considered a Highly Available Key Value store.
>
> Regards.
>
>
>
>
>
> On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> In general, Queryable State should be ready to use.
>> There are a few things to consider though:
>>
>> - State queries are not synchronized with the application code, i.e.,
>> they can happen at the same time. Therefore, the Flink application should
>> not modify objects that have been put into or read from the state if you
>> are not using the RocksDBStatebackend (which creates copies by
>> deserialization).
>> - State will be rolled back after a failure. Hence, you can read writes
>> that are not "committed by a checkpoint".
>>
>> @Kostas, did I forget something?
>>
>> Best, Fabian
>>
>>
>>
>> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :
>>
>>> To be more precise, is anything thing similar to
>>> https://engineering.linkedin.com/blog/2018/03/air-traffic
>>> -controller--member-first-notifications-at-linkedin . done in Samza,
>>> can be replicated with production level guarantees with Flink Queryable
>>> state ( as it stands currently version 1.5 )  ?
>>>
>>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 We are making few decisions on use cases where  Queryable state is a
 natural fit https://ci.apache.org/projects
 /flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html

 Is Queryable state production ready ? We will go to 1.5 flnk if that
 helps to make the case for the usage.

>>>
>>>
>>
>


Re: Queryable State

2018-03-19 Thread Vishal Santoshi
Those are understandable. I am more interested in a few things ( and may be
more that could be added )

* As far as I can understand JM is the SPOF. Does HA become a necessity ?
* If there are 2 or more JM could we theoretically have a LB fronting them
? Thus it is a peer to peer access ( Cassandra ) or a master slave setup
for JM HA specifically for Queryable access ( For  flink jobs it is master
slave )
* Do we replicate state to other TMs for read optimization ( specifically
to avoid Hot Node issues ) ?
* If the job goes down it seems the state is not accessible. What plans to
we have to "separate concerns" for Queryable state.

We consider Queryable State significant a feature Flink provides and would
do the necessary leg work if there are certain gaps in it being trully
considered a Highly Available Key Value store.

Regards.





On Mon, Mar 19, 2018 at 5:53 AM, Fabian Hueske  wrote:

> Hi Vishal,
>
> In general, Queryable State should be ready to use.
> There are a few things to consider though:
>
> - State queries are not synchronized with the application code, i.e., they
> can happen at the same time. Therefore, the Flink application should not
> modify objects that have been put into or read from the state if you are
> not using the RocksDBStatebackend (which creates copies by deserialization).
> - State will be rolled back after a failure. Hence, you can read writes
> that are not "committed by a checkpoint".
>
> @Kostas, did I forget something?
>
> Best, Fabian
>
>
>
> 2018-03-18 16:50 GMT+01:00 Vishal Santoshi :
>
>> To be more precise, is anything thing similar to
>> https://engineering.linkedin.com/blog/2018/03/air-traffic
>> -controller--member-first-notifications-at-linkedin . done in Samza, can
>> be replicated with production level guarantees with Flink Queryable state (
>> as it stands currently version 1.5 )  ?
>>
>> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> We are making few decisions on use cases where  Queryable state is a
>>> natural fit https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/stream/state/queryable_state.html
>>>
>>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>>> helps to make the case for the usage.
>>>
>>
>>
>


Re: Queryable State

2018-03-19 Thread Fabian Hueske
Hi Vishal,

In general, Queryable State should be ready to use.
There are a few things to consider though:

- State queries are not synchronized with the application code, i.e., they
can happen at the same time. Therefore, the Flink application should not
modify objects that have been put into or read from the state if you are
not using the RocksDBStatebackend (which creates copies by deserialization).
- State will be rolled back after a failure. Hence, you can read writes
that are not "committed by a checkpoint".

@Kostas, did I forget something?

Best, Fabian



2018-03-18 16:50 GMT+01:00 Vishal Santoshi :

> To be more precise, is anything thing similar to https://engineering.
> linkedin.com/blog/2018/03/air-traffic-controller--member-
> first-notifications-at-linkedin . done in Samza, can be replicated with
> production level guarantees with Flink Queryable state ( as it stands
> currently version 1.5 )  ?
>
> On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> We are making few decisions on use cases where  Queryable state is a
>> natural fit https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/stream/state/queryable_state.html
>>
>> Is Queryable state production ready ? We will go to 1.5 flnk if that
>> helps to make the case for the usage.
>>
>
>


Re: Queryable State

2018-03-18 Thread Vishal Santoshi
To be more precise, is anything thing similar to
https://engineering.linkedin.com/blog/2018/03/air-traffic-controller--member-first-notifications-at-linkedin
. done in Samza, can be replicated with production level guarantees with
Flink Queryable state ( as it stands currently version 1.5 )  ?

On Fri, Mar 16, 2018 at 5:10 PM, Vishal Santoshi 
wrote:

> We are making few decisions on use cases where  Queryable state is a
> natural fit https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/stream/state/queryable_state.html
>
> Is Queryable state production ready ? We will go to 1.5 flnk if that helps
> to make the case for the usage.
>


Re: queryable state API

2018-02-05 Thread Fabian Hueske
Hi Maciek,

AFAIK, there is some ongoing work to integrate queryable state with the new
FLIP-6 mode.
Maybe Kostas (in CC) who has worked on the queryable state API can help to
answer your questions.

Best, Fabian

2018-02-01 9:19 GMT+01:00 Maciek Próchniak :

> Hello,
>
> Currently (1.4) to be able to use queryable state client has to know ip of
> (working) task manager and port. This is a bit awkward - as it forces
> external services to know details of flink cluster. Event more complex when
> we define port range for queryable state proxy and we're not sure which
> port is chosen... In former versions it was possible to use only job
> manager address - I understand it was changed for performance reasons.
>
> Are there plans to make using queryable state a bit easier? E.g. to be
> able to get list of taskmanagers with respectable queryable state proxies
> via JobManager REST API? It would be great if external services could
> communicate with Flink cluster knowing only jobmanager adresses...
>
> thanks,
>
> maciek
>
>


Re: Queryable State - Count within Time Window

2018-01-10 Thread Velumani Duraisamy
Thank you, Fabian, for the references. This is helpful. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Queryable State in Flink 1.4

2018-01-09 Thread Fabian Hueske
Thanks Boris.
I've filed FLINK-8391 [1] to extend the documentation.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8391

2018-01-05 19:52 GMT+01:00 Boris Lublinsky :

> Thanks
> This was it.
> It would help to have this in documentation along with
> `flink-queryable-state-client`
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Jan 5, 2018, at 11:46 AM, Till Rohrmann  wrote:
>
> Did you add the `flink-queryable-state-runtime` jar as a dependency to
> your project? You can check the log whether a queryable state proxy and
> server have been started.
>
> Cheers,
> Till
>
> On Fri, Jan 5, 2018 at 5:19 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> I also tried to comment out
>>
>> //config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
>>
>> Still no luck.
>> Do you guys have a working example for queryable state for 1.4 somewhere?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>> On Jan 5, 2018, at 6:33 AM, Till Rohrmann  wrote:
>>
>> Hi Boris,
>>
>> if you start 2 TaskManagers on the same host, then you have to define a
>> port range for the KvState server and the proxy. Otherwise the Flink
>> cluster should not be able to start.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jan 4, 2018 at 11:19 PM, Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> It appears, that queryable state access significantly changed in 1.4
>>> compared to 1.3.
>>>
>>> Documentation on the queryable state client https://ci.apache.org/p
>>> rojects/flink/flink-docs-release-1.4/dev/stream/state/querya
>>> ble_state.html#example
>>> States that the client needs to connect to a proxy port.
>>> My implementation, which I used for 1.3 is enclosed
>>>
>>>
>>> Here I am setting both server port and proxy port.
>>> When I am running it on a localhost and try to do lsof -i:9069 and lsof
>>> -i:9067 It does not show anything using this port.
>>>
>>> Am I missing something?
>>>
>>> As a result my query implementation
>>>
>>>
>>> Return an error - connection refused
>>>
>>> My state is defined in the following class
>>>
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com
>>> https://www.lightbend.com/
>>>
>>>
>>>
>>
>>
>
>


Re: Queryable State - Count within Time Window

2018-01-09 Thread Fabian Hueske
Hi,

you can implement that with a ProcessFunction [1].

The ProcessFunction would have to compute counts at some granularity
(either event-time or processing-time) of records that are processed by the
ProcessFunction in processElement().
I would do this with a MapState that has a timestamp as key and a count as
value. The counts should be stored with some resolution, e.g., every 10
seconds (depends on your requirements).
This means you have one count for all elements that arrive within 10
seconds.

The actual count of the last 5 minutes is stored in a ValueState. and
updated in regular intervals using timers.
When updating, you iterate over the MapState and discard all counts that
are older than 5 minutes and compute the new count from the remaining
counts.

The ValueState is configured to be queryable.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html

2018-01-08 15:15 GMT+01:00 Velu Mitwa :

> Hi,
> I want to find the number of events happened so far in last 5 minutes and
> make that as a Queryable state. Is it possible? It will be of great help if
> someone provide some sample code for the same.
>
> Thanks,
> Velu.
>
>


Re: Queryable State in Flink 1.4

2018-01-05 Thread Boris Lublinsky
Thanks 
This was it.
It would help to have this in documentation along with 
`flink-queryable-state-client`

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jan 5, 2018, at 11:46 AM, Till Rohrmann  wrote:
> 
> Did you add the `flink-queryable-state-runtime` jar as a dependency to your 
> project? You can check the log whether a queryable state proxy and server 
> have been started.
> 
> Cheers,
> Till
> 
> On Fri, Jan 5, 2018 at 5:19 PM, Boris Lublinsky 
> > wrote:
> I also tried to comment out 
> //config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
> Still no luck.
> Do you guys have a working example for queryable state for 1.4 somewhere?
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
>> On Jan 5, 2018, at 6:33 AM, Till Rohrmann > > wrote:
>> 
>> Hi Boris,
>> 
>> if you start 2 TaskManagers on the same host, then you have to define a port 
>> range for the KvState server and the proxy. Otherwise the Flink cluster 
>> should not be able to start.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Jan 4, 2018 at 11:19 PM, Boris Lublinsky 
>> > wrote:
>> It appears, that queryable state access significantly changed in 1.4 
>> compared to 1.3.
>> 
>> Documentation on the queryable state client 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html#example
>>  
>> 
>> States that the client needs to connect to a proxy port.
>> My implementation, which I used for 1.3 is enclosed 
>> 
>> 
>> Here I am setting both server port and proxy port.
>> When I am running it on a localhost and try to do lsof -i:9069 and lsof 
>> -i:9067 It does not show anything using this port.
>> 
>> Am I missing something?
>> 
>> As a result my query implementation 
>> 
>>   
>> Return an error - connection refused
>> 
>> My state is defined in the following class 
>> 
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com 
>> https://www.lightbend.com/ 
>> 
>> 
> 
> 



Re: Queryable State in Flink 1.4

2018-01-05 Thread Till Rohrmann
Did you add the `flink-queryable-state-runtime` jar as a dependency to your
project? You can check the log whether a queryable state proxy and server
have been started.

Cheers,
Till

On Fri, Jan 5, 2018 at 5:19 PM, Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> I also tried to comment out
>
> //config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
>
> Still no luck.
> Do you guys have a working example for queryable state for 1.4 somewhere?
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Jan 5, 2018, at 6:33 AM, Till Rohrmann  wrote:
>
> Hi Boris,
>
> if you start 2 TaskManagers on the same host, then you have to define a
> port range for the KvState server and the proxy. Otherwise the Flink
> cluster should not be able to start.
>
> Cheers,
> Till
>
> On Thu, Jan 4, 2018 at 11:19 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> It appears, that queryable state access significantly changed in 1.4
>> compared to 1.3.
>>
>> Documentation on the queryable state client https://ci.apache.org/p
>> rojects/flink/flink-docs-release-1.4/dev/stream/state/querya
>> ble_state.html#example
>> States that the client needs to connect to a proxy port.
>> My implementation, which I used for 1.3 is enclosed
>>
>>
>> Here I am setting both server port and proxy port.
>> When I am running it on a localhost and try to do lsof -i:9069 and lsof
>> -i:9067 It does not show anything using this port.
>>
>> Am I missing something?
>>
>> As a result my query implementation
>>
>>
>> Return an error - connection refused
>>
>> My state is defined in the following class
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>>
>
>


Re: Queryable State in Flink 1.4

2018-01-05 Thread Boris Lublinsky
I also tried to comment out 
//config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
Still no luck.
Do you guys have a working example for queryable state for 1.4 somewhere?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jan 5, 2018, at 6:33 AM, Till Rohrmann  wrote:
> 
> Hi Boris,
> 
> if you start 2 TaskManagers on the same host, then you have to define a port 
> range for the KvState server and the proxy. Otherwise the Flink cluster 
> should not be able to start.
> 
> Cheers,
> Till
> 
> On Thu, Jan 4, 2018 at 11:19 PM, Boris Lublinsky 
> > wrote:
> It appears, that queryable state access significantly changed in 1.4 compared 
> to 1.3.
> 
> Documentation on the queryable state client 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html#example
>  
> 
> States that the client needs to connect to a proxy port.
> My implementation, which I used for 1.3 is enclosed 
> 
> 
> Here I am setting both server port and proxy port.
> When I am running it on a localhost and try to do lsof -i:9069 and lsof 
> -i:9067 It does not show anything using this port.
> 
> Am I missing something?
> 
> As a result my query implementation 
> 
>   
> Return an error - connection refused
> 
> My state is defined in the following class 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
> 
> 



Re: Queryable State in Flink 1.4

2018-01-05 Thread Boris Lublinsky
Thanks Till,
I am probably slow.
I changed the code to the following:

// In a non MiniCluster setup queryable state is enabled by default.
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "50100-50101")
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2)
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2)

config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "50110-50111")
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2)
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2)
And lsof still shows nothing 




Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jan 5, 2018, at 6:33 AM, Till Rohrmann  wrote:
> 
> Hi Boris,
> 
> if you start 2 TaskManagers on the same host, then you have to define a port 
> range for the KvState server and the proxy. Otherwise the Flink cluster 
> should not be able to start.
> 
> Cheers,
> Till
> 
> On Thu, Jan 4, 2018 at 11:19 PM, Boris Lublinsky 
> > wrote:
> It appears, that queryable state access significantly changed in 1.4 compared 
> to 1.3.
> 
> Documentation on the queryable state client 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html#example
>  
> 
> States that the client needs to connect to a proxy port.
> My implementation, which I used for 1.3 is enclosed 
> 
> 
> Here I am setting both server port and proxy port.
> When I am running it on a localhost and try to do lsof -i:9069 and lsof 
> -i:9067 It does not show anything using this port.
> 
> Am I missing something?
> 
> As a result my query implementation 
> 
>   
> Return an error - connection refused
> 
> My state is defined in the following class 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com 
> https://www.lightbend.com/ 
> 
> 



Re: Queryable State Client - Actor Not found Exception

2018-01-04 Thread Velu Mitwa
Thank you Aljoscha.

I am able to Query state when I use the hostname of Job Manager instead of
its IP Address. But I couldn't understand why it is not working if I give
IP address.

On Thu, Jan 4, 2018 at 6:42 PM, Aljoscha Krettek 
wrote:

> Hi,
>
> Is my-machine:52650 the correct address for the JobManager running in
> YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when
> you use YARN with HA mode.
>
> Best,
> Aljoscha
>
> On 3. Jan 2018, at 16:02, Velu Mitwa  wrote:
>
> Hi,
> I am running a Flink Job which uses the Queryable State feature of Apache
> Flink(1.3.2). I was able to do that in local mode. When I try to do that in
> Cluster mode (Yarn Session), I am getting Actor not found Exception.
>
> Please help me to understand what is missing.
>
> *Exception Trace*
>
>
> Query failed because of the following Exception:
> akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.
> tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:65)
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(Ba
> tchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.
> scala:73)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(
> BatchingExecutor.scala:120)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> execute(Future.scala:73)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(
> Promise.scala:40)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
> mise.scala:248)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
> ort.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at scala.concurrent.Future$InternalCallbackExecutor$.scala$
> concurrent$Future$InternalCallbackExecutor$$unbatchedExecute
> (Future.scala:694)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(
> Future.scala:691)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
> executeTask(Scheduler.scala:474)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.
> executeBucket$1(Scheduler.scala:425)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(
> Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(
> Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
>
> *Client Creation Snippet *
>
> * Configuration config = new Configuration();*
> *config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerHost);*
> *config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerPort);*
>
> *final HighAvailabilityServices highAvailabilityServices =
> HighAvailabilityServicesUtils*
> *.createHighAvailabilityServices(config,
> Executors.newSingleThreadScheduledExecutor(),*
> *
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
>
> *this.client = new QueryableStateClient(config,
> highAvailabilityServices);*
> *  }*
>
>
>
>


Re: Queryable State Client - Actor Not found Exception

2018-01-04 Thread Aljoscha Krettek
Hi,

Is my-machine:52650 the correct address for the JobManager running in YARN? 
Also, Queryable State in Flink 1.3.2 is not easy to get to work when you use 
YARN with HA mode.

Best,
Aljoscha 

> On 3. Jan 2018, at 16:02, Velu Mitwa  wrote:
> 
> Hi,
> I am running a Flink Job which uses the Queryable State feature of Apache 
> Flink(1.3.2). I was able to do that in local mode. When I try to do that in 
> Cluster mode (Yarn Session), I am getting Actor not found Exception. 
> 
> Please help me to understand what is missing. 
> 
> Exception Trace
> 
> Query failed because of the following Exception:
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@my-machine:52650/), 
> Path(/user/jobmanager)]
> at 
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> at 
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
> at 
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> at 
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:120)
> at 
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
> 
> Client Creation Snippet 
> 
>  Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
> jobManagerHost);
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
> jobManagerPort);
> 
> final HighAvailabilityServices highAvailabilityServices = 
> HighAvailabilityServicesUtils
> .createHighAvailabilityServices(config, 
> Executors.newSingleThreadScheduledExecutor(),
> 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
> 
> this.client = new QueryableStateClient(config, highAvailabilityServices);
>   }
> 



Re: Queryable State Python

2017-11-10 Thread Kostas Kloudas
Hi Martin,

I will try to reply to your questions inline:

> On Nov 10, 2017, at 1:59 PM, Martin Eden  wrote:
> 
> Hi,
> 
> Our team is looking at replacing Redis with Flink's own queryable state 
> mechanism. However our clients are using python.
> 
> 1. Is there a python integration with the Flink queryable state mechanism?
> Cannot seem to be able to find one.

There is no Python API for queryable state. Currently only Java is supported.

> 2. If not, is it on the roadmap?

I am not aware of any efforts towards that direction, although there are 
discussions about porting queryable state to REST, so 
that more clients (in any language) can be written.
> 
> 3. Our current solution is to write a Java RPC proxy and query that from 
> python. The whole point of using queryable state was to get rid of an extra 
> service (Redis) but now it seems we need to add another one. Is there an 
> easier way to call queryable state from Python without requiring an extra 
> service?

Unfortunately for now I am not aware of any easier way to do so.

> 
> 4. Is queryable state used in production by anyone? Can anyone share numbers, 
> experiences, case studies?

Queryable state is currently under heavy development. So APIs may change and 
features may be added. 
For queryable state users, I would recommend checking out the talks in previous 
Flink Forward editions. They are all on-line. 

> 
> 5. What is the direction that queryable state is going in for the next Flink 
> release? Features, api?

The next release is going to come soon (it is currently under testing), so you 
can already have a look on how Queryable State
is going to look like if you check out the current release-1.4 branch of Flink 
on GitHub.

> 
> 6. Is the Flink queryable state going to be supported/developed going forward 
> with the advent of Pravega which has caching like capabilities as well?

For this I am cc’ing Stephan. Probably he is more informed.

Hope this helps,
Kostas

> Thanks,
> M



Re: Queryable State

2017-09-17 Thread Navneeth Krishnan
No, it doesn't work even if I increase the timeout.

The state being fetched is a Map of data and has around 100 entries in it.
I have a single job manager and 3 task managers with 16 slots each running
on AWS EC2.

final TypeSerializer keySerializer =
TypeInformation.of(new TypeHint()
{}).createSerializer(new ExecutionConfig());
final TypeSerializer> valueSerializer =
TypeInformation.of(new TypeHint>()
{}).createSerializer(new ExecutionConfig());

final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

final FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);


On Fri, Sep 15, 2017 at 6:44 AM, Kostas Kloudas  wrote:

> Hi Navneeth,
>
> If you increase the timeout, everything works ok?
> I suppose from your config that you are running in standalone mode, right?
>
> Any other information about the job (e.g. code and/or size of state being
> fetched) and
> the cluster setup that can help us pin down the problem, would be
> appreciated.
>
> Thanks,
> Kostas
>
> On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan 
> wrote:
>
> Hi,
>
> I am sure I have provided the right job manager details because the
> connection timeout ip is the task manager where the state is kept. I guess
> the client is able to reach the job manager and figure out where the state
> is. Also if I provide a wrong state name, I'm receiving unknown state
> exception. I couldn't find why there is a timeout and a warning message is
> logged in the job manager.
>
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas 
> wrote:
>
>> Hi,
>>
>>
>> are you sure your jobmanager is running and is accessible from the
>> supplied
>> hostname and port? If you can start up the FLink UI of the job which
>> creates
>> your queryable state, it should have the details of the job manager and
>> the
>> port to be used in this queryable client job.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>


Re: Queryable State

2017-09-15 Thread Kostas Kloudas
Hi Navneeth,

If you increase the timeout, everything works ok?
I suppose from your config that you are running in standalone mode, right?

Any other information about the job (e.g. code and/or size of state being 
fetched) and 
the cluster setup that can help us pin down the problem, would be appreciated.

Thanks,
Kostas

> On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan  
> wrote:
> 
> Hi,
> 
> I am sure I have provided the right job manager details because the 
> connection timeout ip is the task manager where the state is kept. I guess 
> the client is able to reach the job manager and figure out where the state 
> is. Also if I provide a wrong state name, I'm receiving unknown state 
> exception. I couldn't find why there is a timeout and a warning message is 
> logged in the job manager.
> 
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas  > wrote:
> Hi,
> 
> 
> are you sure your jobmanager is running and is accessible from the supplied
> hostname and port? If you can start up the FLink UI of the job which creates
> your queryable state, it should have the details of the job manager and the
> port to be used in this queryable client job.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: Queryable State

2017-09-14 Thread Navneeth Krishnan
Hi,

Any idea on how to solve this issue?

Thanks

On Wed, Sep 13, 2017 at 10:12 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi,
>
> I am sure I have provided the right job manager details because the
> connection timeout ip is the task manager where the state is kept. I guess
> the client is able to reach the job manager and figure out where the state
> is. Also if I provide a wrong state name, I'm receiving unknown state
> exception. I couldn't find why there is a timeout and a warning message is
> logged in the job manager.
>
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas 
> wrote:
>
>> Hi,
>>
>>
>> are you sure your jobmanager is running and is accessible from the
>> supplied
>> hostname and port? If you can start up the FLink UI of the job which
>> creates
>> your queryable state, it should have the details of the job manager and
>> the
>> port to be used in this queryable client job.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Queryable State

2017-09-13 Thread Navneeth Krishnan
Hi,

I am sure I have provided the right job manager details because the
connection timeout ip is the task manager where the state is kept. I guess
the client is able to reach the job manager and figure out where the state
is. Also if I provide a wrong state name, I'm receiving unknown state
exception. I couldn't find why there is a timeout and a warning message is
logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas 
wrote:

> Hi,
>
>
> are you sure your jobmanager is running and is accessible from the supplied
> hostname and port? If you can start up the FLink UI of the job which
> creates
> your queryable state, it should have the details of the job manager and the
> port to be used in this queryable client job.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


RE: Queryable State

2017-09-13 Thread Marchant, Hayden
I can see the job running in the FlinkUI for the job, and specifically 
specified the port for the Job Manager. When I provided a different port, I got 
an akka exception. Here, it seems that the code is getting further. I think 
that it might be connected with how I am creating the StateDescriptor. What 
exactly does it mean when the KvStateLocation can't be found?

-Original Message-
From: Biplob Biswas [mailto:revolutioni...@gmail.com] 
Sent: Wednesday, September 13, 2017 2:20 PM
To: user@flink.apache.org
Subject: Re: Queryable State

Hi, 


are you sure your jobmanager is running and is accessible from the supplied 
hostname and port? If you can start up the FLink UI of the job which creates 
your queryable state, it should have the details of the job manager and the 
port to be used in this queryable client job.



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=ox9rY5RgZleCKLmUaw2y4BpSeUaf32AN7o4HRP1gkUQ=gZtSvvulOpw2jMACIgulbIacj6bKfndY6B7LdP-jRbg=
 


Re: Queryable State

2017-09-13 Thread Biplob Biswas
Hi, 


are you sure your jobmanager is running and is accessible from the supplied
hostname and port? If you can start up the FLink UI of the job which creates
your queryable state, it should have the details of the job manager and the
port to be used in this queryable client job.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Queryable State

2017-09-11 Thread Navneeth Krishnan
Hi All,

Any suggestions would really be helpful. Thanks

On Sun, Sep 10, 2017 at 12:04 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi All,
>
> I'm running a streaming job on flink 1.3.2 with few queryable states.
> There are 3 task managers and a job manager. I'm getting timeout exception
> when trying to query a state and also a warning message in the job manager
> log.
>
> *Client:*
> final Configuration config = new Configuration();
>
> config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
> config.setInteger(JobManagerOptions.PORT, 
> JobManagerOptions.PORT.defaultValue());
>
> final HighAvailabilityServices highAvailabilityServices =
> HighAvailabilityServicesUtils.createHighAvailabilityServices(
> config,
> Executors.newSingleThreadScheduledExecutor(),
> 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>
> QueryableStateClient client = new QueryableStateClient(config, 
> highAvailabilityServices);
>
>
> *Exception:*
> Exception in thread "main" io.netty.channel.ConnectTimeoutException:
> connection timed out: /172.31.18.170:43537
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(
> AbstractNioChannel.java:212)
> at io.netty.util.concurrent.PromiseTask$RunnableAdapter.
> call(PromiseTask.java:38)
> at io.netty.util.concurrent.ScheduledFutureTask.run(
> ScheduledFutureTask.java:120)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
> SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> *Job Manager:*
> 2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor
>- Association with remote system [akka.tcp://
> flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms.
> Reason: [Disassociated]
>
> Thanks,
> Navneeth
>
>
>


Re: Queryable State max number of clients

2017-08-14 Thread Ziyad Muhammed
Hi Aljoscha, Ufuk

Thank you for the replies.

I'm using RocksDB state backend. Could you please explain the blocking I/O
calls mentioned?
when will it happen? And what will be the effect? A timeout exception?


Best
Ziyad

On Mon, Aug 14, 2017 at 10:17 PM, Ufuk Celebi  wrote:

> This is as Aljoscha describes. Each thread can handle many different
> clients at the same time. You shouldn't need to change the defaults in
> most cases.
>
> The network threads handle the TCP connections and dispatch query
> tasks to the query threads which do the actual querying of the state
> backend. In case of the RocksDB backend for example this might involve
> blocking I/O calls.
>
> – Ufuk
>
> On Mon, Aug 14, 2017 at 12:35 PM, Aljoscha Krettek 
> wrote:
> > Hi,
> >
> > I think the number of network treads and number of query threads only
> > roughly correlate with the number of clients that can query in parallel
> > since this is using asynchronous communication via Akka/Netty. Of course,
> > increasing that number means there can be more connections but I think
> even
> > just 1 Thread or each of those should be able to easily handle multiple
> > queries at the same time.
> >
> > I'm cc'ing Ufuk and Kostas who might know more about this.
> >
> > Best,
> > Aljoscha
> >
> > On 9. Aug 2017, at 17:19, Ziyad Muhammed  wrote:
> >
> > Hi all,
> >
> > I'm trying to understand how many parallel clients will be supported by
> the
> > queryable state.
> >
> > query.server.network-threads: number of network (event loop) threads for
> the
> > KvStateServer (0 => #slots)
> > query.server.query-threads: number of asynchronous query threads for the
> > KvStateServerHandler (0 => #slots).
> >
> > so, if I choose 0 for both these parameters, what will be the maximum
> number
> > of parallel clients supported?
> >
> > I tried more parallel clients than number of slots, but all of them were
> > able to query the state in parallel. Can someone help me to understand
> the
> > logic here?
> >
> > Thanks in advance.
> >
> > Best
> > Ziyad
> >
> >
>


Re: Queryable State max number of clients

2017-08-14 Thread Ufuk Celebi
This is as Aljoscha describes. Each thread can handle many different
clients at the same time. You shouldn't need to change the defaults in
most cases.

The network threads handle the TCP connections and dispatch query
tasks to the query threads which do the actual querying of the state
backend. In case of the RocksDB backend for example this might involve
blocking I/O calls.

– Ufuk

On Mon, Aug 14, 2017 at 12:35 PM, Aljoscha Krettek  wrote:
> Hi,
>
> I think the number of network treads and number of query threads only
> roughly correlate with the number of clients that can query in parallel
> since this is using asynchronous communication via Akka/Netty. Of course,
> increasing that number means there can be more connections but I think even
> just 1 Thread or each of those should be able to easily handle multiple
> queries at the same time.
>
> I'm cc'ing Ufuk and Kostas who might know more about this.
>
> Best,
> Aljoscha
>
> On 9. Aug 2017, at 17:19, Ziyad Muhammed  wrote:
>
> Hi all,
>
> I'm trying to understand how many parallel clients will be supported by the
> queryable state.
>
> query.server.network-threads: number of network (event loop) threads for the
> KvStateServer (0 => #slots)
> query.server.query-threads: number of asynchronous query threads for the
> KvStateServerHandler (0 => #slots).
>
> so, if I choose 0 for both these parameters, what will be the maximum number
> of parallel clients supported?
>
> I tried more parallel clients than number of slots, but all of them were
> able to query the state in parallel. Can someone help me to understand the
> logic here?
>
> Thanks in advance.
>
> Best
> Ziyad
>
>


Re: Queryable State max number of clients

2017-08-14 Thread Aljoscha Krettek
Hi,

I think the number of network treads and number of query threads only roughly 
correlate with the number of clients that can query in parallel since this is 
using asynchronous communication via Akka/Netty. Of course, increasing that 
number means there can be more connections but I think even just 1 Thread or 
each of those should be able to easily handle multiple queries at the same time.

I'm cc'ing Ufuk and Kostas who might know more about this.

Best,
Aljoscha

> On 9. Aug 2017, at 17:19, Ziyad Muhammed  wrote:
> 
> Hi all,
> 
> I'm trying to understand how many parallel clients will be supported by the 
> queryable state. 
> 
> query.server.network-threads: number of network (event loop) threads for the 
> KvStateServer (0 => #slots)
> query.server.query-threads: number of asynchronous query threads for the 
> KvStateServerHandler (0 => #slots).
> so, if I choose 0 for both these parameters, what will be the maximum number 
> of parallel clients supported?
> 
> I tried more parallel clients than number of slots, but all of them were able 
> to query the state in parallel. Can someone help me to understand the logic 
> here?
> 
> Thanks in advance.
> 
> Best
> Ziyad



Re: queryable state vs. writing result back to Kafka

2017-08-09 Thread Aljoscha Krettek
One advantage is that you don't need an external system for storing results if 
they are readily available in Flink. There are also downsides, however. For 
example, queryable state is not available if the Job is suspended or restarting.

Generally, I think this depends on the use case.

Best,
Aljoscha

> On 5. Aug 2017, at 13:00, Georg Heiler  wrote:
> 
> What is the advantage of queryable state compared to writing the result back 
> to Kafka?
> 
> regards,
> Georg



Re: Queryable State

2017-06-14 Thread Nico Kruber
Hi Chet,
you should not see a 
org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation when querying an 
existing(!) key.
However, if you query a key the non-registered TaskManager is responsible for, 
I suppose this is the exception you will get. Unfortunately, the queryable 
state API still seems to be rough around the edges.

I suspect that the TaskManagers register their queryable state only after 
receiving data(?) and this causes the UnknownKvStateKeyGroupLocation instead 
of a UnknownKeyOrNamespace.


Nico

On Thursday, 4 May 2017 20:05:29 CEST Chet Masterson wrote:
> I found the issue. When parallelism = 3, my test data set was skewed such
> that data was only going to two of the three task managers (kafka partition
> = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a
> test data set with enough keys that spread across all three task managers,
> queryable state started working as expected. That is why only two KVStates
> were registered with the job manager, instead of three. 
> my FINAL :-) questionshould I be getting
> org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event
> only N-1 task managers have data in a parallelism of N situation? 
> Thanks for all the help!
>  
>  
> 04.05.2017, 11:24, "Ufuk Celebi" :
> Could you try KvStateRegistry#registerKvState please?
> 
> In the JM logs you should see something about the number of connected
> task managers and in the task manager logs that each one connects to a
> JM.
> 
> – Ufuk
> 
> 
> On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
>  wrote:
> 
>  Can do. Any advice on where the trace prints should go in the task manager
>  source code?
> 
>  BTW - How do I know I have a correctly configured cluster? Is there a set
> of messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
> 
>  Thanks!
> 
> 
>  02.05.2017, 06:03, "Ufuk Celebi" :
> 
>  Hey Chet! I'm wondering why you are only seeing 2 registration
>  messages for 3 task managers. Unfortunately, there is no log message
>  at the task managers when they send out the notification. Is it
>  possible for you to run a remote debugger with the task managers or
>  build a custom Flink version with the appropriate log messages on the
>  task manager side?
>  – Ufuk
> 
> 
>  On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
>   wrote:
> 
> 
> 
>   Any insight here? I've got a situation where a key value state on a task
>   manager is being registered with the job manager, but when I try to query
>   it, the job manager responds it doesn't know the location of the key value
> state...
> 
> 
>   26.04.2017, 12:11, "Chet Masterson" :
> 
>   After setting the logging to DEBUG on the job manager, I learned four
>   things:
> 
>   (On the message formatting below, I have the Flink logs formatted into
> JSON so I can import them into Kibana)
> 
>   1. The appropriate key value state is registered in both parallelism = 1
>  and
>   parallelism = 3 environments. In parallelism = 1, I saw one registration
>   message in the log, in the parallelism = 3, I saw two registration
>  messages:
>   {"level":"DEBUG","time":"2017-04-26
> 
>  15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc
> ":"", "msg":"Key value state registered for job  under name
> "}
> 
>   2. When I issued the query in both parallelism = 1 and parallelism = 3
>   environments, I saw "Lookup key-value state for job  with
>   registration name ". In parallelism = 1, I saw 1 log message,
> in parallelism = 3, I saw two identical messages.
> 
>   3. I saw no other messages in the job manager log that seemed relevant.
> 
>   4. When issuing the query in parallelism = 3, I continued to get the
> error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>   of null.
> 
>   Thanks!
> 
> 
> 
> 
> 
>   26.04.2017, 09:52, "Ufuk Celebi" :
> 
>   Thanks! Your config looks good to me.
> 
>   Could you please set the log level org.apache.flink.runtime.jobmanager to
>   DEBUG?
> 
>   log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
> 
>   Then we can check whether the JobManager logs the registration of the
>   state instance with the respective name in the case of parallelism >
>   1?
> 
>   Expected output is something like this: "Key value state registered
>   for job ${msg.getJobId} under name ${msg.getRegistrationName}."
> 
>   – Ufuk
> 
>   On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>    wrote:
> 
>Ok...more information.
> 
>1. Built a fresh cluster from the ground up. Started testing queryable
>   state
>

Re: Queryable State Client with 1.3.0-rc0

2017-06-07 Thread Aljoscha Krettek
Hi Claudio,

Quick question: what exactly was your call for getting the local environment 
with web UI? Did you also have a custom Configuration where you specified, for 
example, that the queryable state server should be enabled?

I can make an example work where I start a local cluster in one process (in the 
IDE) and then query from another process (also started in the IDE) but only if 
I manually start the LocalFlinkMiniCluster, as outlined in my last mail. I’m 
talking about Flink 1.2.x here.

Best,
Aljoscha

> On 6. Jun 2017, at 17:23, Aljoscha Krettek  wrote:
> 
> Hi Claudio,
> 
> Quick follow up: querying a locally started cluster does not work out-of-box 
> anymore in Flink 1.3. You can manually start a mini cluster that has the 
> required settings, though. You would do something like this:
> 
> Configuration configuration = new Configuration();
> configuration.addAll(jobGraph.getJobConfiguration());
> 
> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
> configuration.setString(JobManagerOptions.ADDRESS, "localhost");
> configuration.setInteger(JobManagerOptions.PORT, 6123);
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
> 
> flinkMiniCluster = new LocalFlinkMiniCluster(
>configuration,
>HighAvailabilityServicesUtils.createHighAvailabilityServices(
>configuration,
>Executors.directExecutor(),
>
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
>false);
> 
> flinkMiniCluster.start();
> 
> And then you can create a remote StreamExecutionEnvironment using 
> StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to 
> that cluster.
> 
> You can stop the cluster using flinkMiniCluster.stop()
> 
> I hope this helps?
> 
> Best,
> Aljoscha
> 
>> On 6. Jun 2017, at 16:33, Aljoscha Krettek  wrote:
>> 
>> Hi Claudio,
>> 
>> The documentation for this was recently updated: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state.
>>  Could you see if that helps? The important bit for you is probably this:
>> 
>> final HighAvailabilityServices highAvailabilityServices =
>>  HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>   config,
>>   Executors.newSingleThreadScheduledExecutor(),
>>   
>> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>> 
>> If that doesn’t help we’ll need to delve deeper.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 11. May 2017, at 22:21, Fahey, Claudio  wrote:
>>> 
>>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now 
>>> upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a 
>>> HighAvailabilityServices parameter. The documentation hasn’t been updated 
>>> on using HighAvailabilityServices so I’m a bit lost on what exactly I 
>>> should specify for that parameter. For development, I want to connect to a 
>>> Flink Job Manager that I created from a different process using 
>>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody 
>>> provide the code needed to create the appropriate HighAvailabilityServices 
>>> parameter?
>>> 
>>> I have tried the following code:
>>> 
>>>  val jobManagerIpcAddress = “localhost”
>>>  val jobManagerIpcPort = 6123
>>>  configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>>  configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>>  private val highAvailabilityServices = new 
>>> StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>>>  private val client = new QueryableStateClient(configuration, 
>>> highAvailabilityServices)
>>> 
>>> It results in:
>>> 
>>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: 
>>> ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>> 
>>> 
>>> Claudio Fahey
>>> Chief Solutions Architect, Analytics
>>> Dell EMC | Emerging Technologies Team
>>> 
>> 
> 



Re: Queryable State Client with 1.3.0-rc0

2017-06-06 Thread Aljoscha Krettek
Hi Claudio,

Quick follow up: querying a locally started cluster does not work out-of-box 
anymore in Flink 1.3. You can manually start a mini cluster that has the 
required settings, though. You would do something like this:

Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());

configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 6123);
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

flinkMiniCluster = new LocalFlinkMiniCluster(
configuration,
HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
Executors.directExecutor(),
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
false);

flinkMiniCluster.start();

And then you can create a remote StreamExecutionEnvironment using 
StreamExecutionEnvironment.createRemoteEnvironment() to submit your job to that 
cluster.

You can stop the cluster using flinkMiniCluster.stop()

I hope this helps?

Best,
Aljoscha

> On 6. Jun 2017, at 16:33, Aljoscha Krettek  wrote:
> 
> Hi Claudio,
> 
> The documentation for this was recently updated: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state.
>  Could you see if that helps? The important bit for you is probably this:
> 
> final HighAvailabilityServices highAvailabilityServices =
>   HighAvailabilityServicesUtils.createHighAvailabilityServices(
>config,
>Executors.newSingleThreadScheduledExecutor(),
>
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
> 
> If that doesn’t help we’ll need to delve deeper.
> 
> Best,
> Aljoscha
> 
>> On 11. May 2017, at 22:21, Fahey, Claudio  wrote:
>> 
>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now 
>> upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a 
>> HighAvailabilityServices parameter. The documentation hasn’t been updated on 
>> using HighAvailabilityServices so I’m a bit lost on what exactly I should 
>> specify for that parameter. For development, I want to connect to a Flink 
>> Job Manager that I created from a different process using 
>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody 
>> provide the code needed to create the appropriate HighAvailabilityServices 
>> parameter?
>>  
>> I have tried the following code:
>>  
>>   val jobManagerIpcAddress = “localhost”
>>   val jobManagerIpcPort = 6123
>>   configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>   configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>   private val highAvailabilityServices = new 
>> StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>>   private val client = new QueryableStateClient(configuration, 
>> highAvailabilityServices)
>>  
>> It results in:
>>  
>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: 
>> ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>  
>>  
>> Claudio Fahey
>> Chief Solutions Architect, Analytics
>> Dell EMC | Emerging Technologies Team
>> 
> 



Re: Queryable State Client with 1.3.0-rc0

2017-06-06 Thread Aljoscha Krettek
Hi Claudio,

The documentation for this was recently updated: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state
 
.
 Could you see if that helps? The important bit for you is probably this:

final HighAvailabilityServices highAvailabilityServices =
  HighAvailabilityServicesUtils.createHighAvailabilityServices(
   config,
   Executors.newSingleThreadScheduledExecutor(),
   
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

If that doesn’t help we’ll need to delve deeper.

Best,
Aljoscha

> On 11. May 2017, at 22:21, Fahey, Claudio  wrote:
> 
> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have now 
> upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a 
> HighAvailabilityServices parameter. The documentation hasn’t been updated on 
> using HighAvailabilityServices so I’m a bit lost on what exactly I should 
> specify for that parameter. For development, I want to connect to a Flink Job 
> Manager that I created from a different process using 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI. Can somebody 
> provide the code needed to create the appropriate HighAvailabilityServices 
> parameter?
>  
> I have tried the following code:
>  
>   val jobManagerIpcAddress = “localhost”
>   val jobManagerIpcPort = 6123
>   configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>   configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>   private val highAvailabilityServices = new 
> StandaloneHaServices(jobManagerIpcAddress, jobManagerIpcAddress)
>   private val client = new QueryableStateClient(configuration, 
> highAvailabilityServices)
>  
> It results in:
>  
> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>  
>  
> Claudio Fahey
> Chief Solutions Architect, Analytics
> Dell EMC | Emerging Technologies Team
> 



Re: Queryable state in a keyed stream not querying properly

2017-06-06 Thread Aljoscha Krettek
Hi Philip,

The JobManager should figure out the correct TaskManager for your query based 
on the key. You mentioned that you get the result 1/3 of the time, is this 1/3 
of the time for queries with exactly the same key or for queries with different 
keys? 

Also, could it be that the state you’re trying to access is simply not there 
(yet)?

It might very well be that you discovered a bug, the queryable state feature 
hasn’t been touched for a while but we’re planning to work on that for the 1.4 
release cycle.

Best,
Aljoscha
> On 19. May 2017, at 04:29, Philip Doctor  wrote:
> 
> Dear Flink Users,
> I’m getting started with Flink and I’ve bumped into a small problem.  I have 
> a keyed stream like this:
>  
> val stream = env.addSource(consumer)
>   .flatMap(new ValidationMap()).name("ValidationMap")
>   .keyBy(x => (x.getObj.foo(), x.getObj.bar(), x.getObj.baz()))
>   .flatMap(new Calculator(this.config.size, 
> this.config.queryableStateName)).name(jobname)
>  
>  
> Within my stream I have a ValueState that I use to maintain a list.
>  
> I then use the QueryableStateClient to
> client.getKvState(flinkJobID, stateName, serializedKey.hashCode(), 
> serializedKey);
>  
> Where the “serializedKey” matches the .keyBy on the keyed stream.
>  
> When I query the state things go wrong.  I’ve determined that the JobManager 
> appears to send my query to one of the three TaskManagers I have running, so 
> about 1/3 of the time I get the proper result and the other 2/3 of the time I 
> get 
>  
> org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace: KvState does not 
> hold any state for key/namespace.
>  
> I feel like I must have somehow misconfigured my job, how can I instruct the 
> job manager to properly query the TaskManager that has my data?  Is there a 
> specific setting or configuration I’m missing?
>  
> Thank you for your time.
>  
> -Phil



Re: Queryable State

2017-05-04 Thread Chet Masterson
I found the issue. When parallelism = 3, my test data set was skewed such that data was only going to two of the three task managers (kafka partition = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a test data set with enough keys that spread across all three task managers, queryable state started working as expected. That is why only two KVStates were registered with the job manager, instead of three. my FINAL :-) questionshould I be getting org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event only N-1 task managers have data in a parallelism of N situation? Thanks for all the help!  04.05.2017, 11:24, "Ufuk Celebi" :Could you try KvStateRegistry#registerKvState please?In the JM logs you should see something about the number of connectedtask managers and in the task manager logs that each one connects to aJM.– UfukOn Tue, May 2, 2017 at 2:53 PM, Chet Masterson wrote: Can do. Any advice on where the trace prints should go in the task manager source code? BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa? Thanks! 02.05.2017, 06:03, "Ufuk Celebi" : Hey Chet! I'm wondering why you are only seeing 2 registration messages for 3 task managers. Unfortunately, there is no log message at the task managers when they send out the notification. Is it possible for you to run a remote debugger with the task managers or build a custom Flink version with the appropriate log messages on the task manager side? – Ufuk On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson  wrote:  Any insight here? I've got a situation where a key value state on a task  manager is being registered with the job manager, but when I try to query  it, the job manager responds it doesn't know the location of the key value  state...  26.04.2017, 12:11, "Chet Masterson" :  After setting the logging to DEBUG on the job manager, I learned four  things:  (On the message formatting below, I have the Flink logs formatted into JSON  so I can import them into Kibana)  1. The appropriate key value state is registered in both parallelism = 1 and  parallelism = 3 environments. In parallelism = 1, I saw one registration  message in the log, in the parallelism = 3, I saw two registration messages:  {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",  "msg":"Key value state registered for job  under name "}  2. When I issued the query in both parallelism = 1 and parallelism = 3  environments, I saw "Lookup key-value state for job  with  registration name ". In parallelism = 1, I saw 1 log message, in  parallelism = 3, I saw two identical messages.  3. I saw no other messages in the job manager log that seemed relevant.  4. When issuing the query in parallelism = 3, I continued to get the error:  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message  of null.  Thanks!  26.04.2017, 09:52, "Ufuk Celebi" :  Thanks! Your config looks good to me.  Could you please set the log level org.apache.flink.runtime.jobmanager to  DEBUG?  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG  Then we can check whether the JobManager logs the registration of the  state instance with the respective name in the case of parallelism >  1?  Expected output is something like this: "Key value state registered  for job ${msg.getJobId} under name ${msg.getRegistrationName}."  – Ufuk  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson   wrote:   Ok...more information.   1. Built a fresh cluster from the ground up. Started testing queryable  state   at each step.   2. When running under any configuration of task managers and job managers   were parallelism = 1, the queries execute as expected.   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job   manager) feeding off a kafka topic partitioned three ways, queries will   always fail, returning error   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an   error message of null.   4. I do know my state is as expected on the cluster. Liberal use of trace   prints show my state managed on the jobs is as I expect. However, I cannot   query them external.   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed   is configured by using the job manager UI.   6. My flink-conf.yaml:   jobmanager.rpc.address: flink01   jobmanager.rpc.port: 6123   jobmanager.heap.mb: 256   taskmanager.heap.mb: 512   taskmanager.data.port: 6121   taskmanager.numberOfTaskSlots: 1  

Re: Queryable State

2017-05-04 Thread Ufuk Celebi
Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
 wrote:
> Can do. Any advice on where the trace prints should go in the task manager
> source code?
>
> BTW - How do I know I have a correctly configured cluster? Is there a set of
> messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
>
> Thanks!
>
>
> 02.05.2017, 06:03, "Ufuk Celebi" :
>
> Hey Chet! I'm wondering why you are only seeing 2 registration
> messages for 3 task managers. Unfortunately, there is no log message
> at the task managers when they send out the notification. Is it
> possible for you to run a remote debugger with the task managers or
> build a custom Flink version with the appropriate log messages on the
> task manager side?
> – Ufuk
>
>
> On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
>  wrote:
>
>
>
>  Any insight here? I've got a situation where a key value state on a task
>  manager is being registered with the job manager, but when I try to query
>  it, the job manager responds it doesn't know the location of the key value
>  state...
>
>
>  26.04.2017, 12:11, "Chet Masterson" :
>
>  After setting the logging to DEBUG on the job manager, I learned four
>  things:
>
>  (On the message formatting below, I have the Flink logs formatted into JSON
>  so I can import them into Kibana)
>
>  1. The appropriate key value state is registered in both parallelism = 1
> and
>  parallelism = 3 environments. In parallelism = 1, I saw one registration
>  message in the log, in the parallelism = 3, I saw two registration
> messages:
>  {"level":"DEBUG","time":"2017-04-26
>
> 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
>  "msg":"Key value state registered for job  under name "}
>
>  2. When I issued the query in both parallelism = 1 and parallelism = 3
>  environments, I saw "Lookup key-value state for job  with
>  registration name ". In parallelism = 1, I saw 1 log message, in
>  parallelism = 3, I saw two identical messages.
>
>  3. I saw no other messages in the job manager log that seemed relevant.
>
>  4. When issuing the query in parallelism = 3, I continued to get the error:
>  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>  of null.
>
>  Thanks!
>
>
>
>
>
>  26.04.2017, 09:52, "Ufuk Celebi" :
>
>  Thanks! Your config looks good to me.
>
>  Could you please set the log level org.apache.flink.runtime.jobmanager to
>  DEBUG?
>
>  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
>
>  Then we can check whether the JobManager logs the registration of the
>  state instance with the respective name in the case of parallelism >
>  1?
>
>  Expected output is something like this: "Key value state registered
>  for job ${msg.getJobId} under name ${msg.getRegistrationName}."
>
>  – Ufuk
>
>  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>   wrote:
>
>   Ok...more information.
>
>   1. Built a fresh cluster from the ground up. Started testing queryable
>  state
>   at each step.
>   2. When running under any configuration of task managers and job managers
>   were parallelism = 1, the queries execute as expected.
>   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
>   manager) feeding off a kafka topic partitioned three ways, queries will
>   always fail, returning error
>   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>   error message of null.
>   4. I do know my state is as expected on the cluster. Liberal use of trace
>   prints show my state managed on the jobs is as I expect. However, I cannot
>   query them external.
>   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
>   is configured by using the job manager UI.
>   6. My flink-conf.yaml:
>
>   jobmanager.rpc.address: flink01
>   jobmanager.rpc.port: 6123
>   jobmanager.heap.mb: 256
>
>   taskmanager.heap.mb: 512
>   taskmanager.data.port: 6121
>   taskmanager.numberOfTaskSlots: 1
>   taskmanager.memory.preallocate: false
>
>   parallelism.default: 1
>   blob.server.port: 6130
>
>   jobmanager.web.port: 8081
>   query.server.enable: true
>
>   7. I do know my job is indeed running in parallel, from trace prints going
>   to the task manager logs.
>
>   Do I need a backend configured when running in parallel for the queryable
>   state? Do I need a shared temp directory on the task managers?
>
>   THANKS!
>
>
>   

Re: Queryable State

2017-05-02 Thread Chet Masterson
Can do. Any advice on where the trace prints should go in the task manager source code? BTW - How do I know I have a correctly configured cluster? Is there a set of messages in the job / task manager logs that indicate all required connectivity is present? I know I use the UI to make sure all the task managers are present, and that the job is running on all of them, but is there some verbiage in the logs that indicates the job manager can talk to all the task managers, and vice versa? Thanks!  02.05.2017, 06:03, "Ufuk Celebi" :Hey Chet! I'm wondering why you are only seeing 2 registrationmessages for 3 task managers. Unfortunately, there is no log messageat the task managers when they send out the notification. Is itpossible for you to run a remote debugger with the task managers orbuild a custom Flink version with the appropriate log messages on thetask manager side?– UfukOn Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson wrote:  Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state... 26.04.2017, 12:11, "Chet Masterson" : After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" : Thanks! Your config looks good to me. Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG? log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG Then we can check whether the JobManager logs the registration of the state instance with the respective name in the case of parallelism > 1? Expected output is something like this: "Key value state registered for job ${msg.getJobId} under name ${msg.getRegistrationName}." – Ufuk On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson  wrote:  Ok...more information.  1. Built a fresh cluster from the ground up. Started testing queryable state  at each step.  2. When running under any configuration of task managers and job managers  were parallelism = 1, the queries execute as expected.  3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job  manager) feeding off a kafka topic partitioned three ways, queries will  always fail, returning error  (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an  error message of null.  4. I do know my state is as expected on the cluster. Liberal use of trace  prints show my state managed on the jobs is as I expect. However, I cannot  query them external.  5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed  is configured by using the job manager UI.  6. My flink-conf.yaml:  jobmanager.rpc.address: flink01  jobmanager.rpc.port: 6123  jobmanager.heap.mb: 256  taskmanager.heap.mb: 512  taskmanager.data.port: 6121  taskmanager.numberOfTaskSlots: 1  taskmanager.memory.preallocate: false  parallelism.default: 1  blob.server.port: 6130  jobmanager.web.port: 8081  query.server.enable: true  7. I do know my job is indeed running in parallel, from trace prints going  to the task manager logs.  Do I need a backend configured when running in parallel for the queryable  state? Do I need a shared temp directory on the task managers?  THANKS!  25.04.2017, 04:24, "Ufuk Celebi" :  It's strange that the rpc port is set to 3 when you use a  standalone cluster and configure 6123 as the port. I'm pretty sure  that the config has not been updated.  But everything should work as you say when you point it to the correct  jobmanager address and port. Could you please post the complete  stacktrace you get instead of the message you log?  On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson   wrote:   More information:   0. I did remove the query.server.port and query.server.enabled from all   flink-conf.yaml files, and restarted the cluster.   1. 

Re: Queryable State

2017-04-28 Thread Chet Masterson
 Any insight here? I've got a situation where a key value state on a task manager is being registered with the job manager, but when I try to query it, the job manager responds it doesn't know the location of the key value state...  26.04.2017, 12:11, "Chet Masterson" :After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" :Thanks! Your config looks good to me.Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?log4j.logger.org.apache.flink.runtime.jobmanager=DEBUGThen we can check whether the JobManager logs the registration of thestate instance with the respective name in the case of parallelism >1?Expected output is something like this: "Key value state registeredfor job ${msg.getJobId} under name ${msg.getRegistrationName}."– UfukOn Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson wrote: Ok...more information. 1. Built a fresh cluster from the ground up. Started testing queryable state at each step. 2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected. 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null. 4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external. 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI. 6. My flink-conf.yaml: jobmanager.rpc.address: flink01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.data.port: 6121 taskmanager.numberOfTaskSlots: 1 taskmanager.memory.preallocate: false parallelism.default: 1 blob.server.port: 6130 jobmanager.web.port: 8081 query.server.enable: true 7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs. Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers? THANKS! 25.04.2017, 04:24, "Ufuk Celebi" : It's strange that the rpc port is set to 3 when you use a standalone cluster and configure 6123 as the port. I'm pretty sure that the config has not been updated. But everything should work as you say when you point it to the correct jobmanager address and port. Could you please post the complete stacktrace you get instead of the message you log? On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson  wrote:  More information:  0. I did remove the query.server.port and query.server.enabled from all  flink-conf.yaml files, and restarted the cluster.  1. The Akka error doesn't seem to have anything to do with the problem. If I  point my query client at an IP address with no Flink server running at all,  I get that error. It seems to be a (side effect?) timeout for "no flink  service is listening on the port you told me to check"  2. I did notice (using the Flink Web UI) even with the config file changes  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port  (6123), on my cluster, jobmanager.rpc.port is set to 3.  3. If I do send a query using the jobmanager.rpc.address and the  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from  the client to Flink will be initiated and completed. When I try to execute  the query (code below), it will fail, and will get trapped. When I look at  the error message returned (e.getMessage() below), it is simply 'null':  try {byte[] serializedResult = Await.result(future, new  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));// de-serialize, commented out for testing

Re: Queryable State

2017-04-26 Thread Chet Masterson
After setting the logging to DEBUG on the job manager, I learned four things: (On the message formatting below, I have the Flink logs formatted into JSON so I can import them into Kibana) 1. The appropriate key value state is registered in both parallelism = 1 and parallelism = 3 environments. In parallelism = 1, I saw one registration message in the log, in the parallelism = 3, I saw two registration messages: {"level":"DEBUG","time":"2017-04-26 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"", "msg":"Key value state registered for job  under name "} 2. When I issued the query in both parallelism = 1 and parallelism = 3 environments, I saw "Lookup key-value state for job  with registration name ". In parallelism = 1, I saw 1 log message, in parallelism = 3, I saw two identical messages. 3. I saw no other messages in the job manager log that seemed relevant. 4. When issuing the query in parallelism = 3, I continued to get the error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a message of null. Thanks! 26.04.2017, 09:52, "Ufuk Celebi" :Thanks! Your config looks good to me.Could you please set the log level org.apache.flink.runtime.jobmanager to DEBUG?log4j.logger.org.apache.flink.runtime.jobmanager=DEBUGThen we can check whether the JobManager logs the registration of thestate instance with the respective name in the case of parallelism >1?Expected output is something like this: "Key value state registeredfor job ${msg.getJobId} under name ${msg.getRegistrationName}."– UfukOn Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson wrote: Ok...more information. 1. Built a fresh cluster from the ground up. Started testing queryable state at each step. 2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected. 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null. 4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external. 5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI. 6. My flink-conf.yaml: jobmanager.rpc.address: flink01 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.data.port: 6121 taskmanager.numberOfTaskSlots: 1 taskmanager.memory.preallocate: false parallelism.default: 1 blob.server.port: 6130 jobmanager.web.port: 8081 query.server.enable: true 7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs. Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers? THANKS! 25.04.2017, 04:24, "Ufuk Celebi" : It's strange that the rpc port is set to 3 when you use a standalone cluster and configure 6123 as the port. I'm pretty sure that the config has not been updated. But everything should work as you say when you point it to the correct jobmanager address and port. Could you please post the complete stacktrace you get instead of the message you log? On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson  wrote:  More information:  0. I did remove the query.server.port and query.server.enabled from all  flink-conf.yaml files, and restarted the cluster.  1. The Akka error doesn't seem to have anything to do with the problem. If I  point my query client at an IP address with no Flink server running at all,  I get that error. It seems to be a (side effect?) timeout for "no flink  service is listening on the port you told me to check"  2. I did notice (using the Flink Web UI) even with the config file changes  in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port  (6123), on my cluster, jobmanager.rpc.port is set to 3.  3. If I do send a query using the jobmanager.rpc.address and the  jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from  the client to Flink will be initiated and completed. When I try to execute  the query (code below), it will fail, and will get trapped. When I look at  the error message returned (e.getMessage() below), it is simply 'null':  try {byte[] serializedResult = Await.result(future, new  FiniteDuration(maxQueryTime, TimeUnit.SECONDS));// de-serialize, commented out for testingreturn null;  }  catch (Exception e) {  logger.error("Queryable State Error:  "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());  return null;  }  Should I be sending the query to the job manager on the the job manager's  rpc port when 

Re: Queryable State

2017-04-26 Thread Chet Masterson
Ok...more information. 1. Built a fresh cluster from the ground up. Started testing queryable state at each step.2. When running under any configuration of task managers and job managers were parallelism = 1, the queries execute as expected.3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job manager) feeding off a kafka topic partitioned three ways, queries will always fail, returning error (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an error message of null.4. I do know my state is as expected on the cluster. Liberal use of trace prints show my state managed on the jobs is as I expect. However, I cannot query them external.5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed is configured by using the job manager UI.6. My flink-conf.yaml: jobmanager.rpc.address: flink01jobmanager.rpc.port: 6123jobmanager.heap.mb: 256 taskmanager.heap.mb: 512taskmanager.data.port: 6121taskmanager.numberOfTaskSlots: 1taskmanager.memory.preallocate: false parallelism.default: 1blob.server.port: 6130 jobmanager.web.port: 8081query.server.enable: true 7. I do know my job is indeed running in parallel, from trace prints going to the task manager logs. Do I need a backend configured when running in parallel for the queryable state? Do I need a shared temp directory on the task managers? THANKS!  25.04.2017, 04:24, "Ufuk Celebi" :It's strange that the rpc port is set to 3 when you use astandalone cluster and configure 6123 as the port. I'm pretty surethat the config has not been updated.But everything should work as you say when you point it to the correctjobmanager address and port. Could you please post the completestacktrace you get instead of the message you log?On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson wrote:  More information: 0. I did remove the query.server.port and query.server.enabled from all flink-conf.yaml files, and restarted the cluster. 1. The Akka error doesn't seem to have anything to do with the problem. If I point my query client at an IP address with no Flink server running at all, I get that error. It seems to be a (side effect?) timeout for "no flink service is listening on the port you told me to check" 2. I did notice (using the Flink Web UI) even with the config file changes in 0, and no changes to the default flink-conf.yaml the jobmanager.rpc.port (6123), on my cluster, jobmanager.rpc.port is set to 3. 3. If I do send a query using the jobmanager.rpc.address and the jobmanager.rpc.port as displayed in the Flink Web UI, the connection to from the client to Flink will be initiated and completed. When I try to execute the query (code below), it will fail, and will get trapped. When I look at the error message returned (e.getMessage() below), it is simply 'null': try {   byte[] serializedResult = Await.result(future, new FiniteDuration(maxQueryTime, TimeUnit.SECONDS));   // de-serialize, commented out for testing   return null; } catch (Exception e) { logger.error("Queryable State Error: "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage()); return null; } Should I be sending the query to the job manager on the the job manager's rpc port when flink is clustered? ALSO - I do know the state name I am trying to query exists, is populated, and the job id exists. I also know the task managers are communicating with the job managers (task managers data port: 6121) and processed the data that resulted in the state variable I am trying to query being populated. All this was logged. 24.04.2017, 10:34, "Ufuk Celebi" : Hey Chet! You can remove query.server.port: 6123 query.server.enable: true That shouldn't cause the Exception we see here though. I'm actually not sure what is causing the PduCodecException. Could this be related to different Akka versions being used in Flink and your client code? [1] Is it possible for you to check this? – Ufuk [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0

Re: Queryable State

2017-04-24 Thread Ufuk Celebi
You should be able to use queryable state w/o any changes to the
default config. The `query.server.port` option defines the port of the
queryable state server that serves the state on the task managers and
it is enabled by default.

The important thing is to configure the client to discover the
JobManager and everything else should work out of the box. Can you
please

1) Use the default config and verify in the JobManager logs that the
JobManager listens on port 6123 (the default JM port) and that all
expected TaskManagers connect to it?

2) Share the code for how you configure the QueryableStateClient?

– Ufuk


On Mon, Apr 24, 2017 at 1:45 PM, Chet Masterson
 wrote:
> I moved up from running queryable state on a standalone Flink instance to a
> several node cluster. My queries don't seem to be responding when I execute
> them on the cluster. A few questions:
>
> 1. The error I am getting:
> WARN [ReliableDeliverySupervisor] Association with remote system
> [akka.tcp://flink@x.x.x.x:6123] has failed, address is now gated for [5000]
> ms. Reason: [Association failed with [akka.tcp://flink@x.x.x.x:6123]] Caused
> by: [Connection refused: /x.x.x.x:6123]
> 2017/04/23 20:19:01.016 ERROR Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@x.x.x.x:6123/),
> Path(/user/jobmanager)]
>
> I assume this is because Flink is not servicing requests on port :6123. I am
> using the default RPC ports defined in flink-conf.yaml. I confirm nothing is
> listening on port 6123 by using netstat on the flink nodes.
>
> 2. I make the following changes on all nodes to flink-conf.yaml, then
> restart the cluster
>
> #jobmanager.rpc.port: 6123
> query.server.port: 6123
> query.server.enable: true
>
> 3. Now port 6123 is open, as viewed from netstat.
>
> My question: what is the proper configuration for servicing external queries
> when running in a cluster? Can I use jobmanager.rpc.port: 6123 which works
> standalone, do I have to add query.server.port and query.server.enable?
> Which port should I be using?
>
>


Re: Queryable State

2017-03-13 Thread Nico Kruber
Hi Chet,
the following thins may create the error you mentioned:
* the job ID of the query must match the ID of the running job
* the job is not running anymore
* the queryableStateName does not match the string given to 
setQueryable("query-name")
* the queried key does not exist (note that you need to give its hash(!) in 
the getKvState() method)
* something is wrong with the keyAndNamespace serialization - the following in 
Java:
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

I have only tested the Java API though which you could try for reference.

Regards
Nico

On Monday, 13 March 2017 12:14:26 CET Fabian Hueske wrote:
> Hi Chet,
> 
> Nico or Ufuk (in CC) should be able to help you.
> 
> Thanks,
> Fabian
> 
> 2017-03-13 11:23 GMT+01:00 Chet Masterson :
> > Any guidance on troubleshooting error
> > 
> > Error: No KvStateLocation found for KvState instance with name X
> > 
> > when trying to make a queryable state call in Flink 1.2.0?
> > 
> > I do know the server is receiving the call made from the remote client.
> > (query.server.enable = true on the server in flink-conf.yaml, that is the
> > only setting I changed there)
> > 
> > I'm following the guidelines documented here:
> > 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/
> > queryable_state.html
> > 
> > under 'Managed Keyed State'
> > 
> > I do understand this feature is 'highly experimental', and my code is in
> > Scala, so I am also following the instructions under 'Note for Scala
> > Users'
> > in the documentation.



signature.asc
Description: This is a digitally signed message part.


Re: Queryable State

2017-03-13 Thread Fabian Hueske
Hi Chet,

Nico or Ufuk (in CC) should be able to help you.

Thanks,
Fabian



2017-03-13 11:23 GMT+01:00 Chet Masterson :

> Any guidance on troubleshooting error
>
> Error: No KvStateLocation found for KvState instance with name X
>
> when trying to make a queryable state call in Flink 1.2.0?
>
> I do know the server is receiving the call made from the remote client.
> (query.server.enable = true on the server in flink-conf.yaml, that is the
> only setting I changed there)
>
> I'm following the guidelines documented here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/
> queryable_state.html
>
> under 'Managed Keyed State'
>
> I do understand this feature is 'highly experimental', and my code is in
> Scala, so I am also following the instructions under 'Note for Scala Users'
> in the documentation.
>
>


Re: Queryable State and Windows

2017-02-26 Thread Konstantin Knauf
I just want to add another workaround, which does not need a
self-compiled version. You can use TimeWindow with a CountTriger.of(1)
combined with a FoldFunction for pre-aggregration and a
RichWindowFunction to update the queryable state. Additionally, you need
a TimeWindow for the final results. So you are doubling the amount of
state as well as computation, but depending on the circumstances this
might be preferrable than tweaking Flink 1.2.

I think, Jamie Grier did this similarly in one of his presentation on
the topic.

Cheers,

Konstantin

On 23.01.2017 15:39, Ufuk Celebi wrote:
> This is not possible at the moment. We discussed this a couple of
> times before, but in the end did not want to expose it with the
> initial version, because the interfaces are still very raw. This is
> definitely on the agenda though.
> 
> As a work around you would have to build a custom Flink version with
> calls `setQueryable` on the state descriptors of the WindowOperator.
> If there is an easy non intrusive way to activate this for the
> upcoming 1.2 version, I will try to do it.
> 
> 
> 
> On Mon, Jan 23, 2017 at 2:46 PM, Joe Olson  wrote:
>> From what I've read in the documentation, and from the examples I've seen,
>> in order to make state queryable externally to Flink, the state descriptor
>> variables need access to the Flink runtime context.
>>
>> This means the stream processor has to have access to the 'Rich' level
>> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state
>> examples I have seen revolve around RichFlatMap.
>>
>> Is there a way to get the runtime context exposed so that you can have state
>> descriptor variables queryable from within a Flink window, while the window
>> is loading?
>>
>> My processor is built around the following:
>>
>> .addSource(new FlinkKafkaConsumer010())
>> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>> .keyBy()
>> .window(GlobalWindows.create())
>> .trigger(new myTrigger())
>> .apply(new myWindowFunction())
>> .addSink(new mySink())
>>
>> The only rich object in this chain are available in the apply
>> (RichWindowFunction). But that is too late - I want to be able to query out
>> whats in the window while it is filling. I know I have access to onElement
>> in the trigger, and I can set up the state descriptor variables there, but
>> the variables don't seem to have exposure to the runtime environment within
>> the trigger.
>>
>> Is there a way to get queryable state within a Flink window while it is
>> filling?
>>
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: Queryable State

2017-01-27 Thread Dawid Wysakowicz
Hi Nico,

No problem at all, I've already presented my showcase with
ValueStateDescriptor.

Anyway, if I could help you somehow with the Queryablestate let me know. I
will be happy to contribute some code.

2017-01-25 14:47 GMT+01:00 Nico Kruber :

> Hi Dawid,
> sorry for the late reply, I was fixing some issues for queryable state and
> may
> now have gotten to the point of your error: you may be seeing a race
> condition
> with the MemoryStateBackend state backend (the default) as described here:
> https://issues.apache.org/jira/browse/FLINK-5642
> I'm currently working on a fix.
>
> KvStateRequestSerializer#deserializeList(), however, is the right
> function to
> de-serialise list state! - KvStateRequestSerializer#deserializeValue()
> will
> not work!
>
> Thanks for the tip regarding KvStateRequestSerializer#serializeList, this
> was
> indeed not used since the list state backends had their own serialisation
> function.
> We removed KvStateRequestSerializer#serializeList as well as the queryable
> list state sink for 1.2 and up.
>
>
> Nico
>
> On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> > Hi Nico, Ufuk,
> >
> > Thanks for diving into this issue.
> >
> > @Nico
> >
> > I don't think that's the problem. The code can be exactly reproduced in
> > java. I am using other constructor for ListDescriptor than you did:
> >
> > You used:
> > > public ListStateDescriptor(String name, TypeInformation typeInfo)
> >
> > While I used:
> > >  public ListStateDescriptor(String name, Class typeClass)
> >
> > I think the problem is with the way I deserialized the value on the
> > QueryClient side as I tried to use:
> >
> >
> >
> > KvStateRequestSerializer.deserializeList(serializedResult, {
> >
> >   TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]()
> {})
> >
> > .createSerializer(new ExecutionConfig)
> >
> > })
> >
> > I have not checked it, but now I suspect this code would work:
> > > KvStateRequestSerializer.deserializeValue(serializedResult, {
> > >
> > >   TypeInformation.of(new
> > >
> > > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> > >
> > > .createSerializer(new ExecutionConfig)
> > >
> > > })
> >
> > Regarding removing the queryable state list I agree, using it seems
> > pointless. Moreover while removing it I would take a second look at those
> >
> > functions:
> > > KvStateRequestSerializer::deserializeList
> >
> >  KvStateRequestSerializer.serializeList
> >
> >
> > As I think they are not used at all even right now. Thanks for your time.
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-16 13:25 GMT+01:00 Nico Kruber :
> > > Hi Dawid,
> > > regarding the original code, I couldn't reproduce this with the Java
> code
> > > I
> > > wrote and my guess is that the second parameter of the
> ListStateDescriptor
> > > is
> > >
> > > wrong:
> > >   .asQueryableState(
> > >
> > > "type-time-series-count",
> > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > >
> > >   "type-time-series-count",
> > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > >
> > > this should rather be
> > >
> > > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> > >
> > > as in the query itself. It sounds strange to me that you don't get ant
> > > ClassCastException or a compile-time error due to the type being wrong
> but
> > > I
> > > lack some Scala knowledge to get to the ground of this.
> > >
> > >
> > > Regarding the removal of the queryable list state "sink", I created a
> JIRA
> > > issue for it and will open a PR:
> > > https://issues.apache.org/jira/browse/FLINK-5507
> > >
> > >
> > > Nico
> > >
> > > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > > Hi Nico,
> > > >
> > > > Recently I've tried the queryable state a bit differently, by using
> > > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > > util.ArrayList and it works as expected.
> > > >
> > > > The non-working example you can browse here:
> > > > https://github.com/dawidwys/flink-intro/tree/
> > >
> > > c66f01117b0fe3c0adc8923000543a7
> > >
> > > > 0a6fe2219 The working example here:
> > > > https://github.com/dawidwys/flink-intro/tree/master
> > > > (The QueryableJob is in module flink-queryable-job and the
> QueryClient
> > > > in
> > > > flink-state-server)
> > > >
> > > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > > presentational purpose, but you may be right there might not be any
> > > > production use for this state and it should be removed.
> > > > Maybe the problem is just with the ListState and removing it would
> > >
> > > resolve
> > >
> > > > also my problem :)
> > > >
> > > > Regards
> > > > Dawid Wysakowicz
> > > >
> > > > 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > > > > Hi Dawid,
> > > > > I'll try to reproduce the error in the next couple of 

Re: Queryable State

2017-01-25 Thread Nico Kruber
Hi Dawid,
sorry for the late reply, I was fixing some issues for queryable state and may 
now have gotten to the point of your error: you may be seeing a race condition 
with the MemoryStateBackend state backend (the default) as described here:
https://issues.apache.org/jira/browse/FLINK-5642
I'm currently working on a fix.

KvStateRequestSerializer#deserializeList(), however, is the right function to 
de-serialise list state! - KvStateRequestSerializer#deserializeValue() will 
not work!

Thanks for the tip regarding KvStateRequestSerializer#serializeList, this was 
indeed not used since the list state backends had their own serialisation 
function.
We removed KvStateRequestSerializer#serializeList as well as the queryable 
list state sink for 1.2 and up.


Nico

On Monday, 16 January 2017 14:47:59 CET Dawid Wysakowicz wrote:
> Hi Nico, Ufuk,
> 
> Thanks for diving into this issue.
> 
> @Nico
> 
> I don't think that's the problem. The code can be exactly reproduced in
> java. I am using other constructor for ListDescriptor than you did:
> 
> You used:
> > public ListStateDescriptor(String name, TypeInformation typeInfo)
> 
> While I used:
> >  public ListStateDescriptor(String name, Class typeClass)
> 
> I think the problem is with the way I deserialized the value on the
> QueryClient side as I tried to use:
> 
> 
> 
> KvStateRequestSerializer.deserializeList(serializedResult, {
> 
>   TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})
> 
> .createSerializer(new ExecutionConfig)
> 
> })
> 
> I have not checked it, but now I suspect this code would work:
> > KvStateRequestSerializer.deserializeValue(serializedResult, {
> > 
> >   TypeInformation.of(new
> > 
> > TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> > 
> > .createSerializer(new ExecutionConfig)
> > 
> > })
> 
> Regarding removing the queryable state list I agree, using it seems
> pointless. Moreover while removing it I would take a second look at those
> 
> functions:
> > KvStateRequestSerializer::deserializeList
> 
>  KvStateRequestSerializer.serializeList
> 
> 
> As I think they are not used at all even right now. Thanks for your time.
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-16 13:25 GMT+01:00 Nico Kruber :
> > Hi Dawid,
> > regarding the original code, I couldn't reproduce this with the Java code
> > I
> > wrote and my guess is that the second parameter of the ListStateDescriptor
> > is
> > 
> > wrong:
> >   .asQueryableState(
> >   
> > "type-time-series-count",
> > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > 
> >   "type-time-series-count",
> >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > 
> > this should rather be
> > 
> > TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
> > 
> > as in the query itself. It sounds strange to me that you don't get ant
> > ClassCastException or a compile-time error due to the type being wrong but
> > I
> > lack some Scala knowledge to get to the ground of this.
> > 
> > 
> > Regarding the removal of the queryable list state "sink", I created a JIRA
> > issue for it and will open a PR:
> > https://issues.apache.org/jira/browse/FLINK-5507
> > 
> > 
> > Nico
> > 
> > On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > > Hi Nico,
> > > 
> > > Recently I've tried the queryable state a bit differently, by using
> > > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > > util.ArrayList and it works as expected.
> > > 
> > > The non-working example you can browse here:
> > > https://github.com/dawidwys/flink-intro/tree/
> > 
> > c66f01117b0fe3c0adc8923000543a7
> > 
> > > 0a6fe2219 The working example here:
> > > https://github.com/dawidwys/flink-intro/tree/master
> > > (The QueryableJob is in module flink-queryable-job and the QueryClient
> > > in
> > > flink-state-server)
> > > 
> > > Sure, I am aware of the downfall of the ListState. I need it just for
> > > presentational purpose, but you may be right there might not be any
> > > production use for this state and it should be removed.
> > > Maybe the problem is just with the ListState and removing it would
> > 
> > resolve
> > 
> > > also my problem :)
> > > 
> > > Regards
> > > Dawid Wysakowicz
> > > 
> > > 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > > > Hi Dawid,
> > > > I'll try to reproduce the error in the next couple of days. Can you
> > 
> > also
> > 
> > > > share
> > > > the value deserializer you use? Also, have you tried even smaller
> > 
> > examples
> > 
> > > > in
> > > > the meantime? Did they work?
> > > > 
> > > > As a side-note in general regarding the queryable state "sink" using
> > > > ListState
> > > > (".asQueryableState(, ListStateDescriptor)"): everything that
> > 
> > enters
> > 
> > > > this operator will be stored forever and never cleaned. Eventually, it
> > > > will
> > > > pile up too much 

Re: Queryable State and Windows

2017-01-23 Thread Ufuk Celebi
This is not possible at the moment. We discussed this a couple of
times before, but in the end did not want to expose it with the
initial version, because the interfaces are still very raw. This is
definitely on the agenda though.

As a work around you would have to build a custom Flink version with
calls `setQueryable` on the state descriptors of the WindowOperator.
If there is an easy non intrusive way to activate this for the
upcoming 1.2 version, I will try to do it.



On Mon, Jan 23, 2017 at 2:46 PM, Joe Olson  wrote:
> From what I've read in the documentation, and from the examples I've seen,
> in order to make state queryable externally to Flink, the state descriptor
> variables need access to the Flink runtime context.
>
> This means the stream processor has to have access to the 'Rich' level
> objects - 'RichFlatMap' for example. All the SNAPSHOT1.2 queryable state
> examples I have seen revolve around RichFlatMap.
>
> Is there a way to get the runtime context exposed so that you can have state
> descriptor variables queryable from within a Flink window, while the window
> is loading?
>
> My processor is built around the following:
>
> .addSource(new FlinkKafkaConsumer010())
> .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> .keyBy()
> .window(GlobalWindows.create())
> .trigger(new myTrigger())
> .apply(new myWindowFunction())
> .addSink(new mySink())
>
> The only rich object in this chain are available in the apply
> (RichWindowFunction). But that is too late - I want to be able to query out
> whats in the window while it is filling. I know I have access to onElement
> in the trigger, and I can set up the state descriptor variables there, but
> the variables don't seem to have exposure to the runtime environment within
> the trigger.
>
> Is there a way to get queryable state within a Flink window while it is
> filling?
>


Re: Queryable State

2017-01-16 Thread Dawid Wysakowicz
Hi Nico, Ufuk,

Thanks for diving into this issue.

@Nico

I don't think that's the problem. The code can be exactly reproduced in
java. I am using other constructor for ListDescriptor than you did:

You used:

> public ListStateDescriptor(String name, TypeInformation typeInfo)
>

While I used:

>  public ListStateDescriptor(String name, Class typeClass)


I think the problem is with the way I deserialized the value on the
QueryClient side as I tried to use:

>

KvStateRequestSerializer.deserializeList(serializedResult, {

  TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {})

.createSerializer(new ExecutionConfig)

})


I have not checked it, but now I suspect this code would work:

> KvStateRequestSerializer.deserializeValue(serializedResult, {
>   TypeInformation.of(new
> TypeHint[util.List[KeyedDataPoint[lang.Integer]]]() {})
> .createSerializer(new ExecutionConfig)
> })


Regarding removing the queryable state list I agree, using it seems
pointless. Moreover while removing it I would take a second look at those
functions:

> KvStateRequestSerializer::deserializeList
>
 KvStateRequestSerializer.serializeList


As I think they are not used at all even right now. Thanks for your time.

Regards
Dawid Wysakowicz

2017-01-16 13:25 GMT+01:00 Nico Kruber :

> Hi Dawid,
> regarding the original code, I couldn't reproduce this with the Java code I
> wrote and my guess is that the second parameter of the ListStateDescriptor
> is
> wrong:
>
>   .asQueryableState(
> "type-time-series-count",
> new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>   "type-time-series-count",
>   classOf[KeyedDataPoint[java.lang.Integer]]))
>
> this should rather be
>
> TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}
>
> as in the query itself. It sounds strange to me that you don't get ant
> ClassCastException or a compile-time error due to the type being wrong but
> I
> lack some Scala knowledge to get to the ground of this.
>
>
> Regarding the removal of the queryable list state "sink", I created a JIRA
> issue for it and will open a PR:
> https://issues.apache.org/jira/browse/FLINK-5507
>
>
> Nico
>
> On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> > Hi Nico,
> >
> > Recently I've tried the queryable state a bit differently, by using
> > ValueState with a value of a util.ArrayList and a ValueSerializer for
> > util.ArrayList and it works as expected.
> >
> > The non-working example you can browse here:
> > https://github.com/dawidwys/flink-intro/tree/
> c66f01117b0fe3c0adc8923000543a7
> > 0a6fe2219 The working example here:
> > https://github.com/dawidwys/flink-intro/tree/master
> > (The QueryableJob is in module flink-queryable-job and the QueryClient in
> > flink-state-server)
> >
> > Sure, I am aware of the downfall of the ListState. I need it just for
> > presentational purpose, but you may be right there might not be any
> > production use for this state and it should be removed.
> > Maybe the problem is just with the ListState and removing it would
> resolve
> > also my problem :)
> >
> > Regards
> > Dawid Wysakowicz
> >
> > 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > > Hi Dawid,
> > > I'll try to reproduce the error in the next couple of days. Can you
> also
> > > share
> > > the value deserializer you use? Also, have you tried even smaller
> examples
> > > in
> > > the meantime? Did they work?
> > >
> > > As a side-note in general regarding the queryable state "sink" using
> > > ListState
> > > (".asQueryableState(, ListStateDescriptor)"): everything that
> enters
> > > this operator will be stored forever and never cleaned. Eventually, it
> > > will
> > > pile up too much memory and is thus of limited use. Maybe it should
> even
> > > be
> > > removed from the API.
> > >
> > >
> > > Nico
> > >
> > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > > Hey Ufuk.
> > > > Did you maybe had a while to have a look at that problem?
> > > >
> > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > > > > Hey Dawid! Thanks for reporting this. I will try to have a look
> over
> > > > > the course of the day. From a first impression, this seems like a
> bug
> > > > > to me.
> > > > >
> > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > >
> > > > >  wrote:
> > > > > > Hi I was experimenting with the Query State feature and I have
> some
> > > > >
> > > > > problems
> > > > >
> > > > > > querying the state.
> > > > > >
> > > > > > The code which I use to produce the queryable state is:
> > > > > > env.addSource(kafkaConsumer).map(
> > > > > >
> > > > > >   e => e match {
> > > > > >
> > > > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > > case ButtonClickEvent(_, _, t) => 

Re: Queryable State

2017-01-16 Thread Nico Kruber
Hi Dawid,
regarding the original code, I couldn't reproduce this with the Java code I 
wrote and my guess is that the second parameter of the ListStateDescriptor is 
wrong:

  .asQueryableState(
"type-time-series-count",
new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
  "type-time-series-count",
  classOf[KeyedDataPoint[java.lang.Integer]]))

this should rather be 

TypeInformation.of(new TypeHint[KeyedDataPoint[lang.Integer]]() {}

as in the query itself. It sounds strange to me that you don't get ant 
ClassCastException or a compile-time error due to the type being wrong but I 
lack some Scala knowledge to get to the ground of this.


Regarding the removal of the queryable list state "sink", I created a JIRA 
issue for it and will open a PR:
https://issues.apache.org/jira/browse/FLINK-5507


Nico

On Saturday, 14 January 2017 14:03:41 CET Dawid Wysakowicz wrote:
> Hi Nico,
> 
> Recently I've tried the queryable state a bit differently, by using
> ValueState with a value of a util.ArrayList and a ValueSerializer for
> util.ArrayList and it works as expected.
> 
> The non-working example you can browse here:
> https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a7
> 0a6fe2219 The working example here:
> https://github.com/dawidwys/flink-intro/tree/master
> (The QueryableJob is in module flink-queryable-job and the QueryClient in
> flink-state-server)
> 
> Sure, I am aware of the downfall of the ListState. I need it just for
> presentational purpose, but you may be right there might not be any
> production use for this state and it should be removed.
> Maybe the problem is just with the ListState and removing it would resolve
> also my problem :)
> 
> Regards
> Dawid Wysakowicz
> 
> 2017-01-13 18:50 GMT+01:00 Nico Kruber :
> > Hi Dawid,
> > I'll try to reproduce the error in the next couple of days. Can you also
> > share
> > the value deserializer you use? Also, have you tried even smaller examples
> > in
> > the meantime? Did they work?
> > 
> > As a side-note in general regarding the queryable state "sink" using
> > ListState
> > (".asQueryableState(, ListStateDescriptor)"): everything that enters
> > this operator will be stored forever and never cleaned. Eventually, it
> > will
> > pile up too much memory and is thus of limited use. Maybe it should even
> > be
> > removed from the API.
> > 
> > 
> > Nico
> > 
> > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > > Hey Ufuk.
> > > Did you maybe had a while to have a look at that problem?
> > > 
> > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > > the course of the day. From a first impression, this seems like a bug
> > > > to me.
> > > > 
> > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > > > 
> > > >  wrote:
> > > > > Hi I was experimenting with the Query State feature and I have some
> > > > 
> > > > problems
> > > > 
> > > > > querying the state.
> > > > > 
> > > > > The code which I use to produce the queryable state is:
> > > > > env.addSource(kafkaConsumer).map(
> > > > > 
> > > > >   e => e match {
> > > > >   
> > > > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > > >   
> > > > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > > > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > > >   e2._3)))
> > > > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> > 
> > e._2))
> > 
> > > > >   .keyBy("key")
> > > > >   .asQueryableState(
> > > > >   
> > > > > "type-time-series-count",
> > > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > > > 
> > > > >   "type-time-series-count",
> > > > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > > > 
> > > > > As you see it is a rather simple job, in which I try to count events
> > 
> > of
> > 
> > > > > different types in windows and then query by event type.
> > > > > 
> > > > > In client code I do:
> > > > > // Query Flink state
> > > > > val future = client.getKvState(jobId, "type-time-series-count",
> > > > > 
> > > > > key.hashCode, seralizedKey)
> > > > > 
> > > > > // Await async result
> > > > > val serializedResult: Array[Byte] = Await.result(
> > > > > 
> > > > >   future, new FiniteDuration(
> > > > >   
> > > > > 10,
> > > > > duration.SECONDS))
> > > > > 
> > > > > // Deserialize response
> > > > > val results = deserializeResponse(serializedResult)
> > > > > 
> > > > > results
> > > > >   
> > > > >   }
> > > > > 
> > > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > > 

  1   2   >