[statefun] Does statefun supports query state by http endpoint?

2022-02-12 Thread casel.chen
Since statefun should bind http endpoint, I wondered if statefun supports query 
state by http endpoint?

[statefun] Add new state failure in Greetings example

2022-02-12 Thread casel.chen
Hello,


I am trying Greeting example of Flink Stateful Functions playground. According 
to the README.md guide I tried to add previous login location in the state 
after run the example well in my laptop.
I added one more state named "previous_login_location" in UserFn, and print it 
in GreetingsFn. When I build and run again, it complains the following error. 
It seems the system can NOT find the newly added state. 
Question: How to add a new state in this case? How to ensure state evolution? 
Thanks!


Caused by: 
org.apache.flink.statefun.sdk.java.storage.IllegalStorageAccessException: Error 
accessing state previous_login_location; State does not exist; make sure that 
this state was registered.

greeter-functions_1 |   at 
org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.slowGetCellOrThrow(ConcurrentAddressScopedStorage.java:102)

greeter-functions_1 |   at 
org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage.getCellOrThrow(ConcurrentAddressScopedStorage.java:78)




Re: Queryable State Deprecation

2022-02-12 Thread Frank Dekervel

Hello,

This is what we did, but i'm not quite convinced that its the best way 
(maybe others could chime in ?).


 * We have a zalando postgres cluster running next to the flink
   cluster, so we can just use a jdbc sink for the state. In theory we
   should be able to switch to exactly once (we didn't do this so far)
 * our stateful processor is a state machine that emits outgoing
   messages based on incoming messages. Sometimes we need to "rewind"
   the state machine to correctly process an incoming message. This
   forces us to keep some history of past messages
 * We don't materialize the state directly, we only materialize the
   state changes, which are then re-materialized in postgres. It took
   us some time to make this bug-free. When we were still debugging
   this, we read a savepoint to look in the state and compare it with
   what we had in postgres.

In a zalando postgres cluster you can only write to the master. But for 
readers, if a small delay is acceptable, you can load balance to the 
replica's.


Greetings,

Frank


On 12.02.22 16:56, Jatti, Karthik wrote:


Hi Frank,

What sink did you end up choosing for materializing the state ?

Our use case into looking at queryable state is that we have many 
readers and a very few writers (readers to writers ratio in the 
1000s). Each consuming application (reader) needs a live view of a 
subset of the state and these applications come online and go offline 
many times a day. What would be a good sink in such a scenario ?


e.g if the state of the flink app was a dynamic table of inventory of 
products built from Kafka streams of purchases and sales. And a subset 
of this state needs to be available for 1000s of readers who have a 
live view of what is available in stock with different aggregations 
and filters . And these consumers come online and go offline, so they 
need to be able to restore their substate and continue to receive 
updates for it.


We are evaluating sinks but haven’t narrowed on anything that would 
look like an obvious case.


Thanks,

Karthik

*From: *Jatti, Karthik 
*Date: *Friday, February 11, 2022 at 6:00 PM
*To: *Frank Dekervel , user@flink.apache.org 
, dwysakow...@apache.org 

*Subject: *Re: Queryable State Deprecation

Thank you Frank and Dawid for providing the context here.

*From: *Frank Dekervel 
*Date: *Friday, February 4, 2022 at 9:56 AM
*To: *user@flink.apache.org 
*Subject: *Re: Queryable State Deprecation

*EXTERNAL SENDER*



Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks 
queryable state was apparently stagnant, we implemented what was 
needed to sink the state to an external data store for querying.


However, if queryable state was in good shape we would definately have 
used it. Making sure that the state is always reflected correctly in 
our external system turned out to be non-trivial for a number of 
reasons: our state is not trivially convertable to rows in a table, 
and sometimes we had (due to our own bugs, but still) inconsistencies 
between the internal flink state and the externally materialized 
state, especially after replaying from a checkpoint/savepoint after a 
crash (we cannot use exactly_once sinks in all occasions).


Also, obviously, we could not use flinks partitioning/parallellism to 
help making state querying more scalable.


Greetings,
Frank

On 04.02.22 14:06, Dawid Wysakowicz wrote:

Hi Karthik,

The reason we deprecated it is because we lacked committers who
could spend time on getting the Queryable state to a production
ready state. I might be speaking for myself here, but I think the
main use case for the queryable state is to have an insight into
the current state of the application for debugging purposes. If it
is used for data serving purposes, we believe it's better to sink
the data into an external store, which can provide better
discoverability and more user friendly APIs for querying the results.

As for debugging/tracking insights you may try to achieve similar
results with metrics.

Best,

Dawid

On 01/02/2022 16:36, Jatti, Karthik wrote:

Hi,

I see on the Flink Roadmap that Queryable state API is
scheduled to be deprecated but I couldn’t find much
information on confluence or this mailing group’s archives to
understand the background as to why it’s being deprecated and
what would be a an alternative.  Any pointers to help me get
some more information here would be great.

Thanks,

Karthik




The information in the email message containing a link to this
page, including any attachments thereto (collectively, “the
e-mail”), is only for use by the intended 

Re: Queryable State Deprecation

2022-02-12 Thread Jatti, Karthik
Hi Frank,

What sink did you end up choosing for materializing the state ?

Our use case into looking at queryable state is that we have many readers and a 
very few writers (readers to writers ratio in the 1000s). Each consuming 
application (reader) needs a live view of a subset of the state and these 
applications come online and go offline many times a day. What would be a good 
sink in such a scenario ?

e.g if the state of the flink app was a dynamic table of inventory of products 
built from Kafka streams of purchases and sales. And a subset of this state 
needs to be available for 1000s of readers who have a live view of what is 
available in stock with different aggregations and filters . And these 
consumers come online and go offline, so they need to be able to restore their 
substate and continue to receive updates for it.

We are evaluating sinks but haven’t narrowed on anything that would look like 
an obvious case.

Thanks,
Karthik


From: Jatti, Karthik 
Date: Friday, February 11, 2022 at 6:00 PM
To: Frank Dekervel , user@flink.apache.org 
, dwysakow...@apache.org 
Subject: Re: Queryable State Deprecation
Thank you Frank and Dawid for providing the context here.

From: Frank Dekervel 
Date: Friday, February 4, 2022 at 9:56 AM
To: user@flink.apache.org 
Subject: Re: Queryable State Deprecation
EXTERNAL SENDER


Hello,

To give an extra datapoint: after a not so successful experiment with 
faust-streaming we moved our application to flink. Since flinks queryable state 
was apparently  stagnant, we implemented what was needed to sink the state to 
an external data store for querying.

However, if queryable state was in good shape we would definately have used it. 
Making sure that the state is always reflected correctly in our external system 
turned out to be non-trivial for a number of reasons: our state is not 
trivially convertable to rows in a table, and sometimes we had (due to our own 
bugs, but still) inconsistencies between the internal flink state and the 
externally materialized state, especially after replaying from a 
checkpoint/savepoint after a crash (we cannot use exactly_once sinks in all 
occasions).

Also, obviously, we could not use flinks partitioning/parallellism to help 
making state querying more scalable.

Greetings,
Frank




On 04.02.22 14:06, Dawid Wysakowicz wrote:

Hi Karthik,

The reason we deprecated it is because we lacked committers who could spend 
time on getting the Queryable state to a production ready state. I might be 
speaking for myself here, but I think the main use case for the queryable state 
is to have an insight into the current state of the application for debugging 
purposes. If it is used for data serving purposes, we believe it's better to 
sink the data into an external store, which can provide better discoverability 
and more user friendly APIs for querying the results.

As for debugging/tracking insights you may try to achieve similar results with 
metrics.

Best,

Dawid
On 01/02/2022 16:36, Jatti, Karthik wrote:
Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this mailing 
group’s archives to understand the background as to why it’s being deprecated 
and what would be a an alternative.  Any pointers to help me get some more 
information here would be great.

Thanks,
Karthik



The information in the email message containing a link to this page, including 
any attachments thereto (collectively, “the e-mail”), is only for use by the 
intended recipient(s). The e-mail may contain information that is confidential, 
proprietary and/or privileged. If you have reason to believe that you are not 
the intended recipient, please notify the sender that you may have received 
this e-mail in error and delete all copies of it, including attachments, from 
your computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an intended 
recipient may be governed by arrangements in place between the sender’s and 
recipient’s respective firms. Eze Software does not represent that the e-mail 
is virus-free, complete or accurate. Eze Software accepts no liability for any 
damage sustained in connection with the content or transmission of the e-mail.

Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

2022-02-12 Thread santosh joshi
We are migrating to KafkaSource

 from FlinkKafkaConsumer
.
We have disabled auto commit of offset and instead committing them manually
to some external store

We override FlinkKafkaConsumer

 and then on an overridden instance of KafkaFetcher

 we try to store the *offset* in some external store by overriding
doCommitInternalOffsetsToKafka

 protected void
doCommitInternalOffsetsToKafka(Map offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
//Store offset in S3
}

Now In order to migrate we tried coping/overriding KafkaSource,
KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant
code which somehow does not look correct

In Custom KafkaSourceReader I tried overriding snapshotState

@Override
public List snapshotState(long checkpointId) {
   // custom logic to store offset in s3
   return super.snapshotState(checkpointId);
}

Is this correct or Is there any other way to achieve the same.


I have asked the similar questions in Stackoverflow




Regards,

Santosh


[statefun] How to make the input/output topics live in different kafka clusters for flink stateful functions

2022-02-12 Thread casel.chen
Hi, I am newbe of flinkd stateful functions. 
And just want to ask a question: How to make the input/output topics live in 
different kafka clusters? 
Thanks!

Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-12 Thread M Singh
 Thanks David for your insights.  Mans
On Saturday, February 12, 2022, 05:26:29 AM EST, David Anderson 
 wrote:  
 
 Flink uses watermarks to indicate when a stream has become complete up through 
some point in time. Various operations on streams wait for watermarks in order 
to know when they can safely stop waiting for further input, and so go ahead 
and produce their results. These operations include event-time windowing, 
interval and temporal joins, pattern matching, and sorting (by timestamp).
Events that are late have timestamps less than equal to the current watermark. 
They have missed their chance to influence the results of those operations that 
rely on watermarks for triggering. But otherwise, Flink doesn't care if events 
are late or not. It's not that late events are automatically dropped in all 
circumstances -- it's just that these temporal operations won't wait long 
enough to accommodate their extreme out-of-order-ness (lateness). 
So yes, your ALL_EVENTS view will contain all of the events, including late 
ones.
When your job starts running, it takes some time for an initial watermark to be 
produced. During that period of time, the current watermark is NULL, and no 
events will be considered late.
Hope this helps clarify things.
Regards,David
On Sat, Feb 12, 2022 at 12:01 AM M Singh  wrote:

 I thought a little more about your references Martijn and wanted to confirm 
one thing - the table is specifying the watermark and the downstream view needs 
to check if it wants all events or only the non-late events.  Please let my 
understanding is correct.  
Thanks again for your references.
Mans
On Friday, February 11, 2022, 05:02:49 PM EST, M Singh 
 wrote:  
 
  
Hi Martijn:
Thanks for the reference.   
My understanding was that if we use watermark then any event with event time 
(in the above example) < event_time - 30 seconds will be dropped automatically. 
 
My question [1] is will the downstream (ALL_EVENTS) view which is selecting the 
events from the table receive events which are late ?  If late events are 
dropped at the table level then do we still need the second predicate check (ts 
> CURRENT_WATERMARK(ts)) to filter out late events at the view level.  
If the table does not drop late events, then will all downstream views/etc need 
to add this check (ts > CURRENT_WATERMARK(ts)) ?
I am still not clear on this concept of whether downstream view need to check 
for late events with this predicate or will they never receive late events.
Thanks again for your time.

On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser 
 wrote:  
 
 Hi,
There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this would 
cover your questions [1].
Best regards,
Martijn
[1] 
https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md
On Fri, 11 Feb 2022 at 16:45, M Singh  wrote:

Hi:
The flink docs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
 indicates that the CURRENT_WATERMARK(rowtime) can return null:

Note that this function can return NULL, and you may have to consider this 
case. For example, if you want to filter out late data you can use:
WHERE
  CURRENT_WATERMARK(ts) IS NULL
  OR ts > CURRENT_WATERMARK(ts)
I have the following questions that if the table is defined with a watermark eg:
CREATE TABLE `MYEVENTS` (`name` STRING,    `event_time` TIMESTAMP_LTZ(3), 
...WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)WITH (...)

1. If we define the water mark as above, will the late events still be 
propagated to a view or table which is selecting from MYEVENTS table:
CREATE TEMPORARY VIEW `ALL_EVENTS` AS    SELECT * FROM MYEVENTS; 
2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are the 
conditions for returning null ?


Thanks
  



  

Re: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner

2022-02-12 Thread David Anderson
You are probably running with Java 11 (with Java 8 these messages aren't
produced). The Flink docs [1] say

These warnings are considered harmless and will be addressed in future
Flink releases.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.10/#java-11-support

David

On Fri, Feb 11, 2022 at 7:44 AM Антон  wrote:

> Hello,
>
> what could be the reason for warning like this:
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/var/flink/flink-1.13.2/lib/flink-dist_2.12-1.13.2.jar) to field
> java.lang.String.value
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
>
> ?
>


Re: huge number of duplicate numbers after experiencing a crash loop in deserializer

2022-02-12 Thread Frank Dekervel

Hello,

Small heads up, we found the cause. It had nothing to do with flink, but 
it was a bug in our own code.
We used CEP to detect when senders would stop send messages (basically 
we used the timeout of a CEP pattern). And when generating these timeout 
messages we made inconsistent use of the different timestamps resulting 
in out of order messages, which were then mistreated in a subsequent 
step ...


Frank




On 09.02.22 17:03, Frank Dekervel wrote:


Hello,

Due to a malformed message in our input queue (kafka), our 
DeserialisationSchema threw an exception, making our flink application 
crash. Since our application was configured to restart, it restarted, 
only to reprocess the same malformed message and crash again.


This happened for a while until we fixed the job and made the 
DeserialisationSchema return null on malformed messages. After that, 
we restarted the job from the last successful savepoint (just before 
the malformed message) and we got a huge number of duplicate messages 
generated from our processors (seems one message for every time flink 
restarted).


In addition to this, one of our map functions had side effects and 
these side effects were also executed a huge number of times. We 
basically have this topology:


kafka source --> stateful process function with event time timers --> 
(slow) map function with side effect (post to external API) --> kafka 
sink (at least once)


We don't use the exactly once sink (instead we use at least once), 
because an occasional duplicate would not harm us and we need low 
latency. However, having massive number of duplicates is a problem.


So i'm trying to understand how checkpoints+savepoints really work and 
in what situation we could end up having a massive amount of 
duplicates. The only way i could think of is the following scenario:


  * the application starts up
  * the stateful process function treats some incoming messages from
kafka and generates some outgoing messages
  * the slow map function starts processing these messages, and at the
same time a checkpoint is saved (somehow without new kafka offsets
???)
  * the application crashes on the malformed input

then again:

  * application restarts
  * the stateful process function treats again the same incoming
messages from kafka, generating exactly the same in flight
messages again (we use deterministic IDs for these messages and we
see the same ID being generated over and over).
  * a checkpoint is saved with more in flight messages, the map
function is slow hence doesn't catch up
  * the application crashes again on the same input.

Are in flight messages stored in a checkpoint somehow ? Is the above 
scenario even possible (reading the design of flink i would think no, 
but then i have no other explanation). We had this once more in the 
past (then it was a crash in another branch of the same dataflow).


Greetings,
Frank





Re: Apache Flink - Will later events from a table with watermark be propagated and when can CURRENT_WATERMARK be null in that situation

2022-02-12 Thread David Anderson
Flink uses watermarks to indicate when a stream has become complete up
through some point in time. Various operations on streams wait for
watermarks in order to know when they can safely stop waiting for
further input, and so go ahead and produce their results. These
operations include event-time windowing, interval and temporal joins,
pattern matching, and sorting (by timestamp).

Events that are late have timestamps less than equal to the current
watermark. They have missed their chance to influence the results of those
operations that rely on watermarks for triggering. But otherwise, Flink
doesn't care if events are late or not. It's not that late events are
automatically dropped in all circumstances -- it's just that these temporal
operations won't wait long enough to accommodate their extreme
out-of-order-ness (lateness).

So yes, your ALL_EVENTS view will contain all of the events, including late
ones.

When your job starts running, it takes some time for an initial watermark
to be produced. During that period of time, the current watermark is NULL,
and no events will be considered late.

Hope this helps clarify things.

Regards,
David

On Sat, Feb 12, 2022 at 12:01 AM M Singh  wrote:

> I thought a little more about your references Martijn and wanted to
> confirm one thing - the table is specifying the watermark and the
> downstream view needs to check if it wants all events or only the non-late
> events.  Please let my understanding is correct.
>
> Thanks again for your references.
>
> Mans
>
> On Friday, February 11, 2022, 05:02:49 PM EST, M Singh <
> mans2si...@yahoo.com> wrote:
>
>
>
> Hi Martijn:
>
> Thanks for the reference.
>
> My understanding was that if we use watermark then any event with event
> time (in the above example) < event_time - 30 seconds will be dropped
> automatically.
>
> My question [1] is will the downstream (ALL_EVENTS) view which is
> selecting the events from the table receive events which are late ?  If
> late events are dropped at the table level then do we still need the second
> predicate check (ts > CURRENT_WATERMARK(ts)) to filter out late events at
> the view level.
>
> If the table does not drop late events, then will all downstream views/etc
> need to add this check (ts > CURRENT_WATERMARK(ts)) ?
>
> I am still not clear on this concept of whether downstream view need to
> check for late events with this predicate or will they never receive late
> events.
>
> Thanks again for your time.
>
>
> On Friday, February 11, 2022, 01:55:09 PM EST, Martijn Visser <
> mart...@ververica.com> wrote:
>
>
> Hi,
>
> There's a Flink SQL Cookbook recipe on CURRENT_WATERMARK, I think this
> would cover your questions [1].
>
> Best regards,
>
> Martijn
>
> [1]
> https://github.com/ververica/flink-sql-cookbook/blob/main/other-builtin-functions/03_current_watermark/03_current_watermark.md
>
> On Fri, 11 Feb 2022 at 16:45, M Singh  wrote:
>
> Hi:
>
> The flink docs (
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/)
> indicates that the CURRENT_WATERMARK(rowtime) can return null:
>
> Note that this function can return NULL, and you may have to consider
> this case. For example, if you want to filter out late data you can use:
>
> WHERE
>   CURRENT_WATERMARK(ts) IS NULL
>   OR ts > CURRENT_WATERMARK(ts)
>
>
> I have the following questions that if the table is defined with a
> watermark eg:
>
> CREATE TABLE `MYEVENTS` (
> `name` STRING,
> `event_time` TIMESTAMP_LTZ(3),
>  ...
> WATERMARK FOR event_time AS event_time - INTERVAL '30' SECONDS)
> WITH (...)
>
>
> 1. If we define the water mark as above, will the late events still be
> propagated to a view or table which is selecting from MYEVENTS table:
>
> CREATE TEMPORARY VIEW `ALL_EVENTS` AS
> SELECT * FROM MYEVENTS;
>
> 2. Can CURRENT_WATERMARK(event_time) still return null ?  If so, what are
> the conditions for returning null ?
>
>
>
> Thanks
>
>
>
>