Re: Sending watermarks into Kafka

2021-12-20 Thread Niels Basjes
I'm reading the Pulsar PIP and noticed another thing to take into account:
multiple applications (with each a different parallelism) that all write
into the same topic.

On Mon, 20 Dec 2021, 10:45 Niels Basjes,  wrote:

> Hi Till,
>
> This morning I also realized what you call an 'effective watermark' is
> indeed what is needed.
> I'm going to read up on what Pulsar has planned.
>
> What I realized is that the consuming application must be aware of the
> parallelism of the producing application, which is independent of the
> partitions in the intermediate transport.
>
> Assume I produce in parallel 2 and have 5 kafka partition which I then
> read in parallel 3; then in the consuming (parallel 3) application I must
> wait for watermarks from each original input before I can continue: which
> is 2
> Also we must assume that those watermarks are created at different
> timestamps.
> So my current assessment is that the watermark records must include at
> least the timestamp, the number of the thread for this watermark and the
> total number of threads .
>
> Niels
>
>
> On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann 
> wrote:
>
>> Hi Niels,
>>
>> if you have multiple inputs going into a single Kafka partition then you
>> have to calculate the effective watermark by looking at the min watermark
>> from all inputs. You could insert a Flink operator that takes care of it
>> and then writes to a set of partitions in 1:n relationship. Alternatively,
>> you could take a look at Pulsar that wants to support this functionality
>> out of the box [1].
>>
>> [1] https://github.com/apache/pulsar/issues/12267
>>
>> Cheers,
>> Till
>>
>> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes  wrote:
>>
>> > Hi,
>> >
>> > About a year ago I spoke at the Flink Forward conference (
>> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling
>> development
>> > problems regarding streaming applications and handling the lack of
>> events
>> > in a stream.
>> > Something I spoke about towards the end of this talk was the idea to
>> ship
>> > the watermarks of a Flink topology into the intermediate transport
>> between
>> > applications so you wouldn't need to recreate them.
>> >
>> > At that time it was just an idea, today I'm actually trying to build
>> that
>> > and see if this idea is actually possible.
>> >
>> > So the class of applications I work on usually do a keyBy on something
>> like
>> > a SessionId, SensorId or IP address.
>> > In low traffic scenarios this means that in Kafka some partitions are
>> > completely idle which makes Windows/GroupBy type operations impossible
>> (in
>> > my talk I explain it a lot better).
>> >
>> > I have a test setup right now to play around with this and I'm running
>> into
>> > a bit of a conceptual hurdle for which I'm looking for help.
>> >
>> > My goal is to ship the watermarks from within a topology into Kafka and
>> > then let a follow up application extract those watermarks again and
>> simply
>> > continue.
>> > The new SinkWriter interface has a void writeWatermark(Watermark
>> > watermark) method
>> > that seems intended for this kind of thing.
>> > The basic operations like writing a watermark into Kafka, reading it
>> again
>> > and then recreating the watermark again works in my test setup (very
>> messy
>> > code but it works).
>> >
>> > My hurdle has to do with the combination of
>> > - different parallelism numbers between Flink and Kafka (how do I ship 2
>> > watermarks into 3 partitions)
>> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a
>> > likely mismatch between the Flink 'partition' and the Kafka `partition`.
>> > - processing speed differences between various threads (like session "A"
>> > needs more CPU cycles/time/processing than session "B") will lead to
>> > skewing of the progression between them.
>> > - watermarks in separate threads in a single Flink topology are not
>> > synchronized (they cannot and should not be).
>> >
>> > Has anyone any pointers on possible ways to handle this?
>> >
>> > Right now my only idea is to ship the watermark into all partitions (as
>> > they do not have a key!) and let the consuming application determine the
>> > "real watermark" based on the mix of watermarks coming in from the
>> upstream
>> > threads.
>> >
>> > All suggestions and ideas are appreciated.
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>> >
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-20 Thread Martijn Visser
Hi all,

Really like the discussion on this topic moving forward. I really think
this feature will be much appreciated by the Flink users. What I still have
left to answer/reply to:

-- Good point. If for whatever reason the different taskmanagers can't get
the latest rule, the Operator Coordinator could send a heartbeat to all
taskmanagers with the latest rules and check the heartbeat response from
all the taskmanagers whether the latest rules of the taskmanager is equal
to these of the Operator Coordinator.

Just to be sure, this indeed would mean that if for whatever reason the
heartbeat timeout, it would crash the job, right?

-- We have consided about the solution mentioned above. In this solution, I
have some questions about how to guarantee the consistency of the rule
between each TaskManager. By having a coodinator in the JobManager to
centrally manage the latest rules, the latest rules of all TaskManagers are
consistent with those of the JobManager, so as to avoid the inconsistencies
that may be encountered in the above solution. Can you introduce how this
solution guarantees the consistency of the rules?

The consistency that we could guarantee was based on how often each
TaskManager would do a refresh and how often we would accept a refresh to
fail. We set the refresh time to a relatively short one (30 seconds) and
maximum failures to 3. That meant that we could guarantee that rules would
be updated in < 2 minutes or else the job would crash. That was sufficient
for our use cases. This also really depends on how big your cluster is. I
can imagine that if you have a large scale cluster that you want to run,
you don't want to DDOS the backend system where you have your rules stored.

-- In summary, the current design is that JobManager tells all TaskManagers
the latest rules through OperatorCoodinator, and will initiate a heartbeat
to check whether the latest rules on each TaskManager are consistent. We
will describe how to deal with the Failover scenario in more detail on FLIP.

Thanks for that. I think having the JobManager tell the TaskManagers the
applicable rules would indeed end up being the best design.

-- about the concerns around consistency raised by Martijn: I think a lot
of those can be mitigated by using an event time timestamp from which the
rules take effect. The reprocessing scenario, for example, is covered by
this. If a pattern processor should become active as soon as possible,
there will still be inconsistencies between Taskmanagers, but "as soon as
possible" is vague anyway, which is why I think that's ok.

I think an event timestamp is indeed a really important one. We also used
that in my previous role, with the ruleActivationTimestamp compared to
eventtime (well, actually we used Kafka logAppend time because
eventtime wasn't always properly set so we used that time to overwrite the
eventtime from the event itself).

Best regards,

Martijn

On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf  wrote:

> Hi Nicholas, Hi Junfeng,
>
> about the concerns around consistency raised by Martijn: I think a lot of
> those can be mitigated by using an event time timestamp from which the
> rules take effect. The reprocessing scenario, for example, is covered by
> this. If a pattern processor should become active as soon as possible,
> there will still be inconsistencies between Taskmanagers, but "as soon as
> possible" is vague anyway, which is why I think that's ok.
>
> about naming: The naming with "PatternProcessor" sounds good to me. Final
> nit: I would go for CEP#patternProccessors, which would be consistent with
> CEP#pattern.
>
> I am not sure about one of the rejected alternatives:
>
> > Have each subtask of an operator make the update on their own
>
>-
>
>It is hard to achieve consistency.
>-
>
>   Though the time interval that each subtask makes the update can be
>   the same, the absolute time they make the update might be different.
> For
>   example, one makes updates at 10:00, 10:05, etc, while another does
> it at
>   10:01, 10:06. In this case the subtasks might never processing data
> with
>   the same set of pattern processors.
>
>
> I would have thought that it is quite easy to poll for the rules from each
> Subtask at *about *the same time. So, this alone does not seem to be
> enough to rule out this option. I've looped in David Moravek to get his
> opinion of the additional load imposed on the JM.
>
> Thanks,
>
> Konstantin
>
> On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang 
> wrote:
>
> > Hi Yue,
> >
> > Thanks for your feedback of the FLIP. I have addressed your questions and
> > made a corresponding explanation as follows:
> >
> > -- About Pattern Updating. If we use PatternProcessoerDiscoverer to
> update
> > the rules, will it increase the load of JM? For example, if the user
> wants
> > the updated rule to take effect immediately,, which means that we need to
> > set a shorter check interval or there is another scenario when users

[jira] [Created] (FLINK-25387) Introduce ExecNodeMetadata annotation and tooling

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25387:


 Summary: Introduce ExecNodeMetadata annotation and tooling
 Key: FLINK-25387
 URL: https://issues.apache.org/jira/browse/FLINK-25387
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Introduce the annotation and possibly some utilities.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25382) Failure in "Upload Logs" task

2021-12-20 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-25382:
--

 Summary: Failure in "Upload Logs" task
 Key: FLINK-25382
 URL: https://issues.apache.org/jira/browse/FLINK-25382
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Affects Versions: 1.15.0
Reporter: Piotr Nowojski


I don't see any error message, but it seems like uploading the logs has failed:

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27568=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=bb16d35c-fdfe-5139-f244-9492cbd2050b

for the following build:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27568=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=2c7d57b9-7341-5a87-c9af-2cf7cc1a37dc



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25384) Job can not migrate from flink-1.12 to flink-1.13

2021-12-20 Thread tanjialiang (Jira)
tanjialiang created FLINK-25384:
---

 Summary: Job can not migrate from flink-1.12 to flink-1.13
 Key: FLINK-25384
 URL: https://issues.apache.org/jira/browse/FLINK-25384
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Affects Versions: 1.14.0, 1.13.0
Reporter: tanjialiang


The NotNullEnforcer is always add in the Flink-1.12. But in this 
[PR|https://issues.apache.org/jira/browse/FLINK-21005], it make a IF branch to 
make a decision whether the NotNullEnforcer to add.

 

So it make the StreamGraph change in flink-1.13. So i think this is the reason 
i can't migrate the flink sql job from flink-1.12 to flink-1.13. :(

 

Solution:make a table sink's column to be NOT NULL, so that it can add a 
NotNullEnforcer operator in to the stream graph, that will work in restore from 
savepoint between flink-1.12 to flink-1.13.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25383) Skip waiting for the final checkpoint if the task do not contains 2-pc operators

2021-12-20 Thread Yun Gao (Jira)
Yun Gao created FLINK-25383:
---

 Summary: Skip waiting for the final checkpoint if the task do not 
contains 2-pc operators
 Key: FLINK-25383
 URL: https://issues.apache.org/jira/browse/FLINK-25383
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Checkpointing
Reporter: Yun Gao






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25386) Harden table serialization in JSON plan

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25386:


 Summary: Harden table serialization in JSON plan
 Key: FLINK-25386
 URL: https://issues.apache.org/jira/browse/FLINK-25386
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Similar to previous subtasks, we should revisit the JSON plan according to 
FLIP-190:

- Consider config option regarding catalog objects
- Helpful exceptions for the unsupported cases



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[DISCUSS] FLIP-203: Incremental savepoints

2021-12-20 Thread Piotr Nowojski
Hi devs,

I would like to start a discussion about a previously announced follow up
of the FLIP-193 [1], namely allowing savepoints to be in native format and
incremental. The changes do not seem invasive. The full proposal is
written down as FLIP-203: Incremental savepoints [2]. Please take a look,
and let me know what you think.

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Semantic


[jira] [Created] (FLINK-25393) Make ConfigMap Name for Leader Election Configurable

2021-12-20 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-25393:


 Summary: Make ConfigMap Name for Leader Election Configurable
 Key: FLINK-25393
 URL: https://issues.apache.org/jira/browse/FLINK-25393
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Konstantin Knauf


When deploying Flink on Kubernetes (standalone; application mode; 
KubernetesHighAvailabilityServices), I would like to configure the name of the 
ConfigMaps used for leader election via the flink-conf.yaml. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Sending watermarks into Kafka

2021-12-20 Thread Niels Basjes
Hi Till,

This morning I also realized what you call an 'effective watermark' is
indeed what is needed.
I'm going to read up on what Pulsar has planned.

What I realized is that the consuming application must be aware of the
parallelism of the producing application, which is independent of the
partitions in the intermediate transport.

Assume I produce in parallel 2 and have 5 kafka partition which I then read
in parallel 3; then in the consuming (parallel 3) application I must wait
for watermarks from each original input before I can continue: which is 2
Also we must assume that those watermarks are created at different
timestamps.
So my current assessment is that the watermark records must include at
least the timestamp, the number of the thread for this watermark and the
total number of threads .

Niels


On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann  wrote:

> Hi Niels,
>
> if you have multiple inputs going into a single Kafka partition then you
> have to calculate the effective watermark by looking at the min watermark
> from all inputs. You could insert a Flink operator that takes care of it
> and then writes to a set of partitions in 1:n relationship. Alternatively,
> you could take a look at Pulsar that wants to support this functionality
> out of the box [1].
>
> [1] https://github.com/apache/pulsar/issues/12267
>
> Cheers,
> Till
>
> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes  wrote:
>
> > Hi,
> >
> > About a year ago I spoke at the Flink Forward conference (
> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
> > problems regarding streaming applications and handling the lack of events
> > in a stream.
> > Something I spoke about towards the end of this talk was the idea to ship
> > the watermarks of a Flink topology into the intermediate transport
> between
> > applications so you wouldn't need to recreate them.
> >
> > At that time it was just an idea, today I'm actually trying to build that
> > and see if this idea is actually possible.
> >
> > So the class of applications I work on usually do a keyBy on something
> like
> > a SessionId, SensorId or IP address.
> > In low traffic scenarios this means that in Kafka some partitions are
> > completely idle which makes Windows/GroupBy type operations impossible
> (in
> > my talk I explain it a lot better).
> >
> > I have a test setup right now to play around with this and I'm running
> into
> > a bit of a conceptual hurdle for which I'm looking for help.
> >
> > My goal is to ship the watermarks from within a topology into Kafka and
> > then let a follow up application extract those watermarks again and
> simply
> > continue.
> > The new SinkWriter interface has a void writeWatermark(Watermark
> > watermark) method
> > that seems intended for this kind of thing.
> > The basic operations like writing a watermark into Kafka, reading it
> again
> > and then recreating the watermark again works in my test setup (very
> messy
> > code but it works).
> >
> > My hurdle has to do with the combination of
> > - different parallelism numbers between Flink and Kafka (how do I ship 2
> > watermarks into 3 partitions)
> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> > likely mismatch between the Flink 'partition' and the Kafka `partition`.
> > - processing speed differences between various threads (like session "A"
> > needs more CPU cycles/time/processing than session "B") will lead to
> > skewing of the progression between them.
> > - watermarks in separate threads in a single Flink topology are not
> > synchronized (they cannot and should not be).
> >
> > Has anyone any pointers on possible ways to handle this?
> >
> > Right now my only idea is to ship the watermark into all partitions (as
> > they do not have a key!) and let the consuming application determine the
> > "real watermark" based on the mix of watermarks coming in from the
> upstream
> > threads.
> >
> > All suggestions and ideas are appreciated.
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
> >
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


[RESULT][VOTE] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-20 Thread Timo Walther

Hi everyone,

The voting time for FLIP-190: Support Version Upgrades for Table API & 
SQL Programs[1] has passed. I'm closing the vote now.


There were 6 +1 votes, 3 of which are binding:

- Jing Zhang (binding)
- Ingo Bürk (binding)
- Godfrey He (binding)
- Martijn Visser (non-binding)
- Wenlong (non-binding)
- Sergey Nuyanzin (non-binding)

There were no -1 votes.

Thus, FLIP-190 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

[1] https://lists.apache.org/thread/rxxxq0q9v9pmzd2ht9nybpg5vrzyhwx7

Cheers,
Timo


On 16.12.21 12:29, Sergey Nuyanzin wrote:

+1 (non-binding)

On Wed, Dec 15, 2021 at 11:22 AM godfrey he  wrote:


+1 (binding)

Best,
Godfrey

Ingo Bürk  于2021年12月15日周三 16:19写道:


+1 (binding)

Thanks for driving this much needed feature!

On 14.12.21 17:45, Timo Walther wrote:

Hi everyone,

I'd like to start a vote on FLIP-190: Support Version Upgrades for

Table

API & SQL Programs [1] which has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an

objection

or not enough votes.

[1] https://cwiki.apache.org/confluence/x/KZBnCw
[2] https://lists.apache.org/thread/n8v32j6o3d50mpblxydbz82q1q436ob4

Cheers,
Timo









[jira] [Created] (FLINK-25391) Updating existing table factories for mutable table options

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25391:


 Summary: Updating existing table factories for mutable table 
options
 Key: FLINK-25391
 URL: https://issues.apache.org/jira/browse/FLINK-25391
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Timo Walther


Update all existing factories for FLINK-25390.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25385) Harden function serialization in JSON plan

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25385:


 Summary: Harden function serialization in JSON plan
 Key: FLINK-25385
 URL: https://issues.apache.org/jira/browse/FLINK-25385
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Similar to FLINK-25230, we should revisit how functions are serialized into the 
JSON plan. 

- No legacy in plan
- No Java serialization in plan
- Consider config option regarding catalog objects
- Helpful exceptions for the unsupported cases
- Use function version 1 for all functions as a first step



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25389) Update JSON plan with new ExecNodeMetadata

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25389:


 Summary: Update JSON plan with new ExecNodeMetadata
 Key: FLINK-25389
 URL: https://issues.apache.org/jira/browse/FLINK-25389
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


This is the first time that we will actually use the annotations.

- Look up by name and version
- Helpful exceptions for the unsupported cases



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25388) Add annotation to all StreamExec nodes

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25388:


 Summary: Add annotation to all StreamExec nodes
 Key: FLINK-25388
 URL: https://issues.apache.org/jira/browse/FLINK-25388
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Add the initial version. Add proper declaration of consumed options etc.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25390) Support mutable options in DynamicTableFactory

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25390:


 Summary: Support mutable options in DynamicTableFactory
 Key: FLINK-25390
 URL: https://issues.apache.org/jira/browse/FLINK-25390
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


Factory update for mutable options.

{code}
DynamicTableFactory {

/* Declares options that can be modified without runtime implications. */
Set> mutableOptions();

}
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25392) Add new STATEMENT SET syntax

2021-12-20 Thread Timo Walther (Jira)
Timo Walther created FLINK-25392:


 Summary: Add new STATEMENT SET syntax
 Key: FLINK-25392
 URL: https://issues.apache.org/jira/browse/FLINK-25392
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


Support:

{code}
EXECUTE STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
{code}

{code}
EXPLAIN STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
{code}

This time we should add this to the SQL parser. We need to figure out a 
solution for the interactive SQL Client.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Sending watermarks into Kafka

2021-12-20 Thread Till Rohrmann
Hi Niels,

if you have multiple inputs going into a single Kafka partition then you
have to calculate the effective watermark by looking at the min watermark
from all inputs. You could insert a Flink operator that takes care of it
and then writes to a set of partitions in 1:n relationship. Alternatively,
you could take a look at Pulsar that wants to support this functionality
out of the box [1].

[1] https://github.com/apache/pulsar/issues/12267

Cheers,
Till

On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes  wrote:

> Hi,
>
> About a year ago I spoke at the Flink Forward conference (
> https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development
> problems regarding streaming applications and handling the lack of events
> in a stream.
> Something I spoke about towards the end of this talk was the idea to ship
> the watermarks of a Flink topology into the intermediate transport between
> applications so you wouldn't need to recreate them.
>
> At that time it was just an idea, today I'm actually trying to build that
> and see if this idea is actually possible.
>
> So the class of applications I work on usually do a keyBy on something like
> a SessionId, SensorId or IP address.
> In low traffic scenarios this means that in Kafka some partitions are
> completely idle which makes Windows/GroupBy type operations impossible (in
> my talk I explain it a lot better).
>
> I have a test setup right now to play around with this and I'm running into
> a bit of a conceptual hurdle for which I'm looking for help.
>
> My goal is to ship the watermarks from within a topology into Kafka and
> then let a follow up application extract those watermarks again and simply
> continue.
> The new SinkWriter interface has a void writeWatermark(Watermark
> watermark) method
> that seems intended for this kind of thing.
> The basic operations like writing a watermark into Kafka, reading it again
> and then recreating the watermark again works in my test setup (very messy
> code but it works).
>
> My hurdle has to do with the combination of
> - different parallelism numbers between Flink and Kafka (how do I ship 2
> watermarks into 3 partitions)
> - the fact that if you do a keyBy (both in Flink and Kafka) there is a
> likely mismatch between the Flink 'partition' and the Kafka `partition`.
> - processing speed differences between various threads (like session "A"
> needs more CPU cycles/time/processing than session "B") will lead to
> skewing of the progression between them.
> - watermarks in separate threads in a single Flink topology are not
> synchronized (they cannot and should not be).
>
> Has anyone any pointers on possible ways to handle this?
>
> Right now my only idea is to ship the watermark into all partitions (as
> they do not have a key!) and let the consuming application determine the
> "real watermark" based on the mix of watermarks coming in from the upstream
> threads.
>
> All suggestions and ideas are appreciated.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-20 Thread Konstantin Knauf
Hi Nicholas, Hi Junfeng,

about the concerns around consistency raised by Martijn: I think a lot of
those can be mitigated by using an event time timestamp from which the
rules take effect. The reprocessing scenario, for example, is covered by
this. If a pattern processor should become active as soon as possible,
there will still be inconsistencies between Taskmanagers, but "as soon as
possible" is vague anyway, which is why I think that's ok.

about naming: The naming with "PatternProcessor" sounds good to me. Final
nit: I would go for CEP#patternProccessors, which would be consistent with
CEP#pattern.

I am not sure about one of the rejected alternatives:

> Have each subtask of an operator make the update on their own

   -

   It is hard to achieve consistency.
   -

  Though the time interval that each subtask makes the update can be
  the same, the absolute time they make the update might be different. For
  example, one makes updates at 10:00, 10:05, etc, while another does it at
  10:01, 10:06. In this case the subtasks might never processing data with
  the same set of pattern processors.


I would have thought that it is quite easy to poll for the rules from each
Subtask at *about *the same time. So, this alone does not seem to be
enough to rule out this option. I've looped in David Moravek to get his
opinion of the additional load imposed on the JM.

Thanks,

Konstantin

On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang 
wrote:

> Hi Yue,
>
> Thanks for your feedback of the FLIP. I have addressed your questions and
> made a corresponding explanation as follows:
>
> -- About Pattern Updating. If we use PatternProcessoerDiscoverer to update
> the rules, will it increase the load of JM? For example, if the user wants
> the updated rule to take effect immediately,, which means that we need to
> set a shorter check interval or there is another scenario when users rarely
> update the pattern, will the PatternProcessoerDiscoverer be in most of the
> time Do useless checks ? Will a lazy update mode could be used, which the
> pattern only be updated when triggered by the user, and do nothing at other
> times?
>
> PatternProcessoerDiscoverer is a user-defined interface to discover the
> PatternProcessor updates. Periodically checking the PatternProcessor in the
> database is a implementation of the PatternProcessoerDiscoverer interface,
> which is that periodically querys all the PatternProcessor table in certain
> interval. This implementation indeeds has the useless checks, and could
> directly integrates the changelog of the table. In addition, in addition to
> the implementation of periodically checking the database, there are other
> implementations such as the PatternProcessor that provides Restful services
> to receive updates.
>
> --  I still have some confusion about how Key Generating Opertator and
> CepOperator (Pattern Matching & Processing Operator) work together. If
> there are N PatternProcessors, will the Key Generating Opertator generate N
> keyedStreams, and then N CepOperator would process each Key separately ? Or
> every CepOperator Task would process all patterns, if so, does the key type
> in each PatternProcessor need to be the same?
>
> Firstly the Pattern Matching & Processing Operator is not the CepOperator
> at present, because CepOperator mechanism is based on the NFAState.
> Secondly if there are N PatternProcessors, the Key Generating Opertator
> combines all the keyedStreams with keyBy() operation, thus the Pattern
> Matching & Processing Operator would process all the patterns. In other
> words, the KeySelector of the PatternProcessor is used for the Key
> Generating Opertator, and the Pattern and PatternProceessFunction of the
> PatternProcessor are used for the Pattern Matching & Processing Operator.
> Lastly the key type in each PatternProcessor is the same, regarded as
> Object type.
>
> -- Maybe need to pay attention to it when implementing it .If some Pattern
> has been removed or updated, will the partially matched results in
> StateBackend would be clean up or We rely on state ttl to clean up these
> expired states.
>
> If certain Pattern has been removed or updated, the partially matched
> results in StateBackend would be clean up until the next checkpoint. The
> partially matched result doesn't depend on the state ttl of the
> StateBackend.
>
> 4. Will the PatternProcessorManager keep all the active PatternProcessor
> in memory? We have also Support Multiple Rule and Dynamic Rule Changing.
> But we are facing such a problem, some users’ usage scenarios are that they
> want to have their own pattern for each user_id, which means that there
> could be thousands of patterns, which would make the performance of Pattern
> Matching very poor. We are also trying to solve this problem.
>
> The PatternProcessorManager keeps all the active PatternProcessor in
> memory. For scenarios that they want to have their own pattern for each
> user_id, IMO, is it 

[jira] [Created] (FLINK-25381) JdbcExactlyOnceSinkE2eTest.testInsert fails on AZP

2021-12-20 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25381:
-

 Summary: JdbcExactlyOnceSinkE2eTest.testInsert fails on AZP
 Key: FLINK-25381
 URL: https://issues.apache.org/jira/browse/FLINK-25381
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.15.0
Reporter: Till Rohrmann
 Fix For: 1.15.0


The test {{JdbcExactlyOnceSinkE2eTest.testInsert}} fails on AZP with

{code}
2021-12-20T03:13:06.5415073Z Dec 20 03:13:06 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
2021-12-20T03:13:06.5415676Z Dec 20 03:13:06at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
2021-12-20T03:13:06.5416393Z Dec 20 03:13:06at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
2021-12-20T03:13:06.5417081Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
2021-12-20T03:13:06.5417686Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
2021-12-20T03:13:06.5418283Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2021-12-20T03:13:06.5418892Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2021-12-20T03:13:06.5424332Z Dec 20 03:13:06at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258)
2021-12-20T03:13:06.5425255Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
2021-12-20T03:13:06.5425994Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
2021-12-20T03:13:06.5426730Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2021-12-20T03:13:06.5427397Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2021-12-20T03:13:06.5428007Z Dec 20 03:13:06at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
2021-12-20T03:13:06.5428649Z Dec 20 03:13:06at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
2021-12-20T03:13:06.5429604Z Dec 20 03:13:06at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
2021-12-20T03:13:06.5430674Z Dec 20 03:13:06at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
2021-12-20T03:13:06.5431696Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
2021-12-20T03:13:06.5432744Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
2021-12-20T03:13:06.5433645Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
2021-12-20T03:13:06.5434697Z Dec 20 03:13:06at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
2021-12-20T03:13:06.5435356Z Dec 20 03:13:06at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
2021-12-20T03:13:06.5436145Z Dec 20 03:13:06at 
akka.dispatch.OnComplete.internal(Future.scala:300)
2021-12-20T03:13:06.5436855Z Dec 20 03:13:06at 
akka.dispatch.OnComplete.internal(Future.scala:297)
2021-12-20T03:13:06.5437690Z Dec 20 03:13:06at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
2021-12-20T03:13:06.5438367Z Dec 20 03:13:06at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
2021-12-20T03:13:06.5439147Z Dec 20 03:13:06at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
2021-12-20T03:13:06.5440092Z Dec 20 03:13:06at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
2021-12-20T03:13:06.5441108Z Dec 20 03:13:06at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
2021-12-20T03:13:06.5442086Z Dec 20 03:13:06at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
2021-12-20T03:13:06.5443269Z Dec 20 03:13:06at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
2021-12-20T03:13:06.5444506Z Dec 20 03:13:06at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
2021-12-20T03:13:06.5445078Z Dec 20 03:13:06at 
akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
2021-12-20T03:13:06.5445682Z Dec 20 03:13:06at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)

Re: [jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackso

2021-12-20 Thread Galen Warren
I just wanted to check in on this, is there any interest in a PR to fix
this issue? Looking at the master branch, it still looks to be a problem.
Thanks.

On Mon, Dec 6, 2021 at 9:52 AM Galen Warren (Jira)  wrote:

> Galen Warren created FLINK-25197:
> 
>
>  Summary: Using Statefun RequestReplyFunctionBuilder fails
> with Java 8 date/time type `java.time.Duration` not supported by default:
> add Module
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> to enable handling
>  Key: FLINK-25197
>  URL: https://issues.apache.org/jira/browse/FLINK-25197
>  Project: Flink
>   Issue Type: Bug
>   Components: Stateful Functions
> Affects Versions: statefun-3.1.0
> Reporter: Galen Warren
>  Fix For: statefun-3.1.0
>
>
> When using RequestReplyFunctionBuilder to build a stateful functions job,
> the job fails at runtime with:
>
> Java 8 date/time type `java.time.Duration` not supported by default: add
> Module
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> to enable handling
>
> It appears this is because, in
> [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|
> https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127],
> a default instance of ObjectMapper is used to serialize the client
> properties, which now include a java.time.Duration. There is a
> [StateFunObjectMapper|
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java]
> class in the project that has customized serde support, but it is not used
> here.
>
> The fix seems to be to:
>  * Use an instance of StateFunObjectMapper to serialize the client
> properties in RequestReplyFunctionBuilder
>  * Modify StateFunObjecdtMapper to both serialize and deserialize
> instances of java.time.Duration (currently, only deserialization is
> supported)
>
> I've made these changes locally and it seems to fix the problem. Would you
> be interested in a PR? Thanks.
>
>
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.20.1#820001)
>


[VOTE] Stateful functions 3.1.1 release

2021-12-20 Thread Igal Shilman
Hi everyone,

Please review and vote on the release candidate #2 for the version 3.1.1 of
Apache Flink Stateful Functions, as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

This release updates the Flink version to fix the log4j CVEs

**Testing Guideline**

You can find here [1] a page in the project wiki on instructions for
testing.
To cast a vote, it is not necessary to perform all listed checks,
but please mention which checks you have performed when voting.

**Release Overview**

As an overview, the release consists of the following:
a) Stateful Functions canonical source distribution, to be deployed to the
release repository at dist.apache.org
b) Stateful Functions Python SDK distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository
d) New Dockerfiles for the release
e) GoLang SDK tag v3.1.1-rc2

**Staging Areas to Review**

The staging areas containing the above mentioned artifacts are as follows,
for your review:
* All artifacts for a) and b) can be found in the corresponding dev
repository at dist.apache.org [2]
* All artifacts for c) can be found at the Apache Nexus Repository [3]

All artifacts are signed with the key
73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4]

Other links for your review:
* JIRA release notes [5]
* source code tag "release-3.0.0-rc1" [6]
* PR for the new Dockerfiles [7]
* PR for the flink website [8]

**Vote Duration**

The voting time will run for 24 hours. We are targeting this vote to last
until December. 21nd, 22:00 CET.
Or It is adopted by majority approval, with at least 3 PMC affirmative
votes.

Thanks,
Seth & Igal

[1]
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.1-rc2/
[3] https://repository.apache.org/content/repositories/orgapacheflink-1466
[4] https://dist.apache.org/repos/dist/release/flink/KEYS
[5]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096==12315522
[6] https://github.com/apache/flink-statefun/tree/release-3.1.1-rc2
[7] https://github.com/apache/flink-statefun-docker/pull/18
[8] https://github.com/apache/flink-web/pull/492


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-20 Thread Nicholas Jiang
Hi Konstantin, Martijn

Thanks for the detailed feedback in the discussion. What I still have left to 
answer/reply to:

-- Martijn: Just to be sure, this indeed would mean that if for whatever reason 
the heartbeat timeout, it would crash the job, right?

IMO, if for whatever reason the heartbeat timeout, it couldn't check the 
PatternProcessor consistency between the OperatorCoordinator and the subtasks 
so that the job would be crashed.

-- Konstantin: What I was concerned about is that we basically let users run a 
UserFunction in the OperatorCoordinator, which it does not seem to have been 
designed for.

In general, we have reached an agreement on the design of this FLIP, but there 
are some concerns on the OperatorCoordinator, about whether basically let users 
run a UserFunction in the OperatorCoordinator is designed for 
OperatorCoordinator. We would like to invite Becket Qin who is the author of 
OperatorCoordinator to help us to answer this concern.

Best,
Nicholas Jiang


On 2021/12/20 10:07:14 Martijn Visser wrote:
> Hi all,
> 
> Really like the discussion on this topic moving forward. I really think
> this feature will be much appreciated by the Flink users. What I still have
> left to answer/reply to:
> 
> -- Good point. If for whatever reason the different taskmanagers can't get
> the latest rule, the Operator Coordinator could send a heartbeat to all
> taskmanagers with the latest rules and check the heartbeat response from
> all the taskmanagers whether the latest rules of the taskmanager is equal
> to these of the Operator Coordinator.
> 
> Just to be sure, this indeed would mean that if for whatever reason the
> heartbeat timeout, it would crash the job, right?
> 
> -- We have consided about the solution mentioned above. In this solution, I
> have some questions about how to guarantee the consistency of the rule
> between each TaskManager. By having a coodinator in the JobManager to
> centrally manage the latest rules, the latest rules of all TaskManagers are
> consistent with those of the JobManager, so as to avoid the inconsistencies
> that may be encountered in the above solution. Can you introduce how this
> solution guarantees the consistency of the rules?
> 
> The consistency that we could guarantee was based on how often each
> TaskManager would do a refresh and how often we would accept a refresh to
> fail. We set the refresh time to a relatively short one (30 seconds) and
> maximum failures to 3. That meant that we could guarantee that rules would
> be updated in < 2 minutes or else the job would crash. That was sufficient
> for our use cases. This also really depends on how big your cluster is. I
> can imagine that if you have a large scale cluster that you want to run,
> you don't want to DDOS the backend system where you have your rules stored.
> 
> -- In summary, the current design is that JobManager tells all TaskManagers
> the latest rules through OperatorCoodinator, and will initiate a heartbeat
> to check whether the latest rules on each TaskManager are consistent. We
> will describe how to deal with the Failover scenario in more detail on FLIP.
> 
> Thanks for that. I think having the JobManager tell the TaskManagers the
> applicable rules would indeed end up being the best design.
> 
> -- about the concerns around consistency raised by Martijn: I think a lot
> of those can be mitigated by using an event time timestamp from which the
> rules take effect. The reprocessing scenario, for example, is covered by
> this. If a pattern processor should become active as soon as possible,
> there will still be inconsistencies between Taskmanagers, but "as soon as
> possible" is vague anyway, which is why I think that's ok.
> 
> I think an event timestamp is indeed a really important one. We also used
> that in my previous role, with the ruleActivationTimestamp compared to
> eventtime (well, actually we used Kafka logAppend time because
> eventtime wasn't always properly set so we used that time to overwrite the
> eventtime from the event itself).
> 
> Best regards,
> 
> Martijn
> 
> On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf  wrote:
> 
> > Hi Nicholas, Hi Junfeng,
> >
> > about the concerns around consistency raised by Martijn: I think a lot of
> > those can be mitigated by using an event time timestamp from which the
> > rules take effect. The reprocessing scenario, for example, is covered by
> > this. If a pattern processor should become active as soon as possible,
> > there will still be inconsistencies between Taskmanagers, but "as soon as
> > possible" is vague anyway, which is why I think that's ok.
> >
> > about naming: The naming with "PatternProcessor" sounds good to me. Final
> > nit: I would go for CEP#patternProccessors, which would be consistent with
> > CEP#pattern.
> >
> > I am not sure about one of the rejected alternatives:
> >
> > > Have each subtask of an operator make the update on their own
> >
> >-
> >
> >It is hard to achieve consistency.
> >  

[jira] [Created] (FLINK-25398) Show complete stacktrace when requesting thread dump

2021-12-20 Thread Junfan Zhang (Jira)
Junfan Zhang created FLINK-25398:


 Summary: Show complete stacktrace when requesting thread dump
 Key: FLINK-25398
 URL: https://issues.apache.org/jira/browse/FLINK-25398
 Project: Flink
  Issue Type: Improvement
Reporter: Junfan Zhang






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Stateful functions 3.1.1 release

2021-12-20 Thread Tzu-Li (Gordon) Tai
+1 (binding)

- Checked hash and signatures
- Checked diff contains Flink upgrade
- mvn clean install with e2e

Thanks,
Gordon

On Mon, Dec 20, 2021, 13:55 Seth Wiesman  wrote:

> +1 (non-binding)
>
> - Verified signatures
> - Checked diff
> - Checked site PR
> - Build from source and ran e2e tests
>
> Seth
>
> On Mon, Dec 20, 2021 at 10:59 AM Igal Shilman  wrote:
>
> > Hi everyone,
> >
> > Please review and vote on the release candidate #2 for the version 3.1.1
> of
> > Apache Flink Stateful Functions, as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > This release updates the Flink version to fix the log4j CVEs
> >
> > **Testing Guideline**
> >
> > You can find here [1] a page in the project wiki on instructions for
> > testing.
> > To cast a vote, it is not necessary to perform all listed checks,
> > but please mention which checks you have performed when voting.
> >
> > **Release Overview**
> >
> > As an overview, the release consists of the following:
> > a) Stateful Functions canonical source distribution, to be deployed to
> the
> > release repository at dist.apache.org
> > b) Stateful Functions Python SDK distributions to be deployed to PyPI
> > c) Maven artifacts to be deployed to the Maven Central Repository
> > d) New Dockerfiles for the release
> > e) GoLang SDK tag v3.1.1-rc2
> >
> > **Staging Areas to Review**
> >
> > The staging areas containing the above mentioned artifacts are as
> follows,
> > for your review:
> > * All artifacts for a) and b) can be found in the corresponding dev
> > repository at dist.apache.org [2]
> > * All artifacts for c) can be found at the Apache Nexus Repository [3]
> >
> > All artifacts are signed with the key
> > 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4]
> >
> > Other links for your review:
> > * JIRA release notes [5]
> > * source code tag "release-3.0.0-rc1" [6]
> > * PR for the new Dockerfiles [7]
> > * PR for the flink website [8]
> >
> > **Vote Duration**
> >
> > The voting time will run for 24 hours. We are targeting this vote to last
> > until December. 21nd, 22:00 CET.
> > Or It is adopted by majority approval, with at least 3 PMC affirmative
> > votes.
> >
> > Thanks,
> > Seth & Igal
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
> > [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.1-rc2/
> > [3]
> https://repository.apache.org/content/repositories/orgapacheflink-1466
> > [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [5]
> >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096==12315522
> > [6] https://github.com/apache/flink-statefun/tree/release-3.1.1-rc2
> > [7] https://github.com/apache/flink-statefun-docker/pull/18
> > [8] https://github.com/apache/flink-web/pull/492
> >
>


[jira] [Created] (FLINK-25396) lookupjoin source table for pre-partitioning

2021-12-20 Thread HunterHunter (Jira)
HunterHunter created FLINK-25396:


 Summary: lookupjoin source table for pre-partitioning
 Key: FLINK-25396
 URL: https://issues.apache.org/jira/browse/FLINK-25396
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: HunterHunter


When we perform external associations, we need to partition by key first, so 
that the same key is in a task, which can reduce the number of queries and make 
the external data cached by each task more scattered rather than full

Example:select * from sourceTable t1 LEFT JOIN lookuptable FOR SYSTEM_TIME AS 
OF t1.proctime as t2 ON t1.msg = t2.word

Execution Plan like:
{code:java}
== Optimized Execution Plan ==
Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) 
AS proctime, word])
+- LookupJoin(table=[default_catalog.default_database.hbaselookup], 
joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, 
offset, rowtime, msg, uid, proctime, word])
   +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, 
Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
      +- TableSourceScan(table=[[default_catalog, default_database, test, 
watermark=[-($0, 1:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, 
offset])
{code}
After I made the optimization, I added a hint 
configuration(lookup.join.pre-partition) and added a rule to generate an 
exchange. so that I can pre-partition by the join key when obtaining external 
data synchronously

select * from test t1 LEFT JOIN hbaselookup /*+ 
OPTIONS('lookup.join.pre-partition'='true') */ FOR SYSTEM_TIME AS OF 
t1.proctime as t2 ON t1.msg = t2.word
{code:java}
== Optimized Execution Plan ==
Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) 
AS proctime, word])
+- LookupJoin(table=[default_catalog.default_database.hbaselookup], 
joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, 
offset, rowtime, msg, uid, proctime, word])
   +- Exchange(distribution=[hash[msg]])
      +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, 
Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
         +- TableSourceScan(table=[[default_catalog, default_database, test, 
watermark=[-($0, 1:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, 
offset])
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25397) grouped_execution

2021-12-20 Thread ZhuoYu Chen (Jira)
ZhuoYu Chen created FLINK-25397:
---

 Summary: grouped_execution
 Key: FLINK-25397
 URL: https://issues.apache.org/jira/browse/FLINK-25397
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Legacy Planner, Table SQL / Planner, Table 
SQL / Runtime
Affects Versions: 1.15.0
Reporter: ZhuoYu Chen


Performing data bucketing execution: two tables (orders, orders_item), divided 
into buckets (bucketing) based on the same fields (orderid) and the same number 
of buckets. In join by order id, join and aggregation calculations can be 
performed independently, because the same order ids of both tables are divided 
into buckets with the same ids.
This has several advantages. 1. Whenever a bucket of data is computed, the 
memory occupied by this bucket can be released immediately, so memory 
consumption can be limited by controlling the number of buckets processed in 
parallel.
2. reduces a lot of shuffling



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackso

2021-12-20 Thread Seth Wiesman
Hi Galen,

Sorry for the late reply, a lot of people are on sporadic schedules with
the holidays. A PR would be very welcome! I've gone ahead and assigned you
the Jira. And I'll watch the repo to review your pr.

cheers,

Seth

On Mon, Dec 20, 2021 at 8:24 AM Galen Warren 
wrote:

> I just wanted to check in on this, is there any interest in a PR to fix
> this issue? Looking at the master branch, it still looks to be a problem.
> Thanks.
>
> On Mon, Dec 6, 2021 at 9:52 AM Galen Warren (Jira) 
> wrote:
>
> > Galen Warren created FLINK-25197:
> > 
> >
> >  Summary: Using Statefun RequestReplyFunctionBuilder fails
> > with Java 8 date/time type `java.time.Duration` not supported by default:
> > add Module
> >
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> > to enable handling
> >  Key: FLINK-25197
> >  URL: https://issues.apache.org/jira/browse/FLINK-25197
> >  Project: Flink
> >   Issue Type: Bug
> >   Components: Stateful Functions
> > Affects Versions: statefun-3.1.0
> > Reporter: Galen Warren
> >  Fix For: statefun-3.1.0
> >
> >
> > When using RequestReplyFunctionBuilder to build a stateful functions job,
> > the job fails at runtime with:
> >
> > Java 8 date/time type `java.time.Duration` not supported by default: add
> > Module
> >
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> > to enable handling
> >
> > It appears this is because, in
> > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|
> >
> https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127
> ],
> > a default instance of ObjectMapper is used to serialize the client
> > properties, which now include a java.time.Duration. There is a
> > [StateFunObjectMapper|
> >
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
> ]
> > class in the project that has customized serde support, but it is not
> used
> > here.
> >
> > The fix seems to be to:
> >  * Use an instance of StateFunObjectMapper to serialize the client
> > properties in RequestReplyFunctionBuilder
> >  * Modify StateFunObjecdtMapper to both serialize and deserialize
> > instances of java.time.Duration (currently, only deserialization is
> > supported)
> >
> > I've made these changes locally and it seems to fix the problem. Would
> you
> > be interested in a PR? Thanks.
> >
> >
> >
> >
> >
> > --
> > This message was sent by Atlassian Jira
> > (v8.20.1#820001)
> >
>


Re: [DISCUSS] FLIP-203: Incremental savepoints

2021-12-20 Thread Konstantin Knauf
Hi Piotr,

Thanks a lot for starting the discussion. Big +1.

In my understanding, this FLIP introduces the snapshot format as a *really*
user facing concept. IMO it is important that we document

a) that it is not longer the checkpoint/savepoint characteristics that
determines the kind of changes that a snapshots allows (user code, state
schema evolution, topology changes), but now this becomes a property of the
format regardless of whether this is a snapshots or a checkpoint
b) the exact changes that each format allows (code, state schema, topology,
state backend, max parallelism)

In this context: will the native format support state schema evolution? If
not, I am not sure, we can let the format default to native.

Thanks,

Konstantin


On Mon, Dec 20, 2021 at 2:09 PM Piotr Nowojski  wrote:

> Hi devs,
>
> I would like to start a discussion about a previously announced follow up
> of the FLIP-193 [1], namely allowing savepoints to be in native format and
> incremental. The changes do not seem invasive. The full proposal is
> written down as FLIP-203: Incremental savepoints [2]. Please take a look,
> and let me know what you think.
>
> Best,
> Piotrek
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Semantic
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Sending watermarks into Kafka

2021-12-20 Thread Matthias J. Sax

I think this problem should be tackled inside Kafka, not Flink.

Kafka already has internal control messages to write transaction 
markers. Those could be extended to carry watermark information. It 
would be best to generalize those as "user control messages" and 
watermarks could just be one application.


In addition, we might need something link a "producer group" to track 
how many producers are writing into a partition: this would allow to 
inform downstream consumer how many different watermarks they need to track.


It's not an easy problem to solve, but without integrating with the 
storage layer, but trying to solve it at the processing layer, it's even 
harder.


-Matthias

On 12/20/21 01:57, Niels Basjes wrote:

I'm reading the Pulsar PIP and noticed another thing to take into account:
multiple applications (with each a different parallelism) that all write
into the same topic.

On Mon, 20 Dec 2021, 10:45 Niels Basjes,  wrote:


Hi Till,

This morning I also realized what you call an 'effective watermark' is
indeed what is needed.
I'm going to read up on what Pulsar has planned.

What I realized is that the consuming application must be aware of the
parallelism of the producing application, which is independent of the
partitions in the intermediate transport.

Assume I produce in parallel 2 and have 5 kafka partition which I then
read in parallel 3; then in the consuming (parallel 3) application I must
wait for watermarks from each original input before I can continue: which
is 2
Also we must assume that those watermarks are created at different
timestamps.
So my current assessment is that the watermark records must include at
least the timestamp, the number of the thread for this watermark and the
total number of threads .

Niels


On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann 
wrote:


Hi Niels,

if you have multiple inputs going into a single Kafka partition then you
have to calculate the effective watermark by looking at the min watermark
from all inputs. You could insert a Flink operator that takes care of it
and then writes to a set of partitions in 1:n relationship. Alternatively,
you could take a look at Pulsar that wants to support this functionality
out of the box [1].

[1] https://github.com/apache/pulsar/issues/12267

Cheers,
Till

On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes  wrote:


Hi,

About a year ago I spoke at the Flink Forward conference (
https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling

development

problems regarding streaming applications and handling the lack of

events

in a stream.
Something I spoke about towards the end of this talk was the idea to

ship

the watermarks of a Flink topology into the intermediate transport

between

applications so you wouldn't need to recreate them.

At that time it was just an idea, today I'm actually trying to build

that

and see if this idea is actually possible.

So the class of applications I work on usually do a keyBy on something

like

a SessionId, SensorId or IP address.
In low traffic scenarios this means that in Kafka some partitions are
completely idle which makes Windows/GroupBy type operations impossible

(in

my talk I explain it a lot better).

I have a test setup right now to play around with this and I'm running

into

a bit of a conceptual hurdle for which I'm looking for help.

My goal is to ship the watermarks from within a topology into Kafka and
then let a follow up application extract those watermarks again and

simply

continue.
The new SinkWriter interface has a void writeWatermark(Watermark
watermark) method
that seems intended for this kind of thing.
The basic operations like writing a watermark into Kafka, reading it

again

and then recreating the watermark again works in my test setup (very

messy

code but it works).

My hurdle has to do with the combination of
- different parallelism numbers between Flink and Kafka (how do I ship 2
watermarks into 3 partitions)
- the fact that if you do a keyBy (both in Flink and Kafka) there is a
likely mismatch between the Flink 'partition' and the Kafka `partition`.
- processing speed differences between various threads (like session "A"
needs more CPU cycles/time/processing than session "B") will lead to
skewing of the progression between them.
- watermarks in separate threads in a single Flink topology are not
synchronized (they cannot and should not be).

Has anyone any pointers on possible ways to handle this?

Right now my only idea is to ship the watermark into all partitions (as
they do not have a key!) and let the consuming application determine the
"real watermark" based on the mix of watermarks coming in from the

upstream

threads.

All suggestions and ideas are appreciated.

--
Best regards / Met vriendelijke groeten,

Niels Basjes






--
Best regards / Met vriendelijke groeten,

Niels Basjes





Re: [jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackso

2021-12-20 Thread Galen Warren
Thanks for getting back to me and Happy Holidays! I'll open a PR soon, it's
a simple fix.

On Mon, Dec 20, 2021 at 9:42 AM Seth Wiesman  wrote:

> Hi Galen,
>
> Sorry for the late reply, a lot of people are on sporadic schedules with
> the holidays. A PR would be very welcome! I've gone ahead and assigned you
> the Jira. And I'll watch the repo to review your pr.
>
> cheers,
>
> Seth
>
> On Mon, Dec 20, 2021 at 8:24 AM Galen Warren 
> wrote:
>
> > I just wanted to check in on this, is there any interest in a PR to fix
> > this issue? Looking at the master branch, it still looks to be a problem.
> > Thanks.
> >
> > On Mon, Dec 6, 2021 at 9:52 AM Galen Warren (Jira) 
> > wrote:
> >
> > > Galen Warren created FLINK-25197:
> > > 
> > >
> > >  Summary: Using Statefun RequestReplyFunctionBuilder fails
> > > with Java 8 date/time type `java.time.Duration` not supported by
> default:
> > > add Module
> > >
> >
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> > > to enable handling
> > >  Key: FLINK-25197
> > >  URL:
> https://issues.apache.org/jira/browse/FLINK-25197
> > >  Project: Flink
> > >   Issue Type: Bug
> > >   Components: Stateful Functions
> > > Affects Versions: statefun-3.1.0
> > > Reporter: Galen Warren
> > >  Fix For: statefun-3.1.0
> > >
> > >
> > > When using RequestReplyFunctionBuilder to build a stateful functions
> job,
> > > the job fails at runtime with:
> > >
> > > Java 8 date/time type `java.time.Duration` not supported by default:
> add
> > > Module
> > >
> >
> "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310"
> > > to enable handling
> > >
> > > It appears this is because, in
> > > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode|
> > >
> >
> https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127
> > ],
> > > a default instance of ObjectMapper is used to serialize the client
> > > properties, which now include a java.time.Duration. There is a
> > > [StateFunObjectMapper|
> > >
> >
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
> > ]
> > > class in the project that has customized serde support, but it is not
> > used
> > > here.
> > >
> > > The fix seems to be to:
> > >  * Use an instance of StateFunObjectMapper to serialize the client
> > > properties in RequestReplyFunctionBuilder
> > >  * Modify StateFunObjecdtMapper to both serialize and deserialize
> > > instances of java.time.Duration (currently, only deserialization is
> > > supported)
> > >
> > > I've made these changes locally and it seems to fix the problem. Would
> > you
> > > be interested in a PR? Thanks.
> > >
> > >
> > >
> > >
> > >
> > > --
> > > This message was sent by Atlassian Jira
> > > (v8.20.1#820001)
> > >
> >
>


[jira] [Created] (FLINK-25394) [Flink-ML] Upgrade log4j to 2.17.0 to address CVE-2021-45105

2021-12-20 Thread Abdelrahman (Jira)
Abdelrahman created FLINK-25394:
---

 Summary: [Flink-ML] Upgrade log4j to 2.17.0 to address 
CVE-2021-45105
 Key: FLINK-25394
 URL: https://issues.apache.org/jira/browse/FLINK-25394
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.14.2
Reporter: Abdelrahman


Apache Log4j2 versions 2.0-alpha1 through 2.16.0 did not protect from 
uncontrolled recursion from self-referential lookups. When the logging 
configuration uses a non-default Pattern Layout with a Context Lookup (for 
example, $${ctx:loginId}), attackers with control over Thread Context Map (MDC) 
input data can craft malicious input data that contains a recursive lookup, 
resulting in a StackOverflowError that will terminate the process. This is also 
known as a DOS (Denial of Service) attack.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25395) Incremental shared state might be discarded by TM

2021-12-20 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-25395:
-

 Summary: Incremental shared state might be discarded by TM
 Key: FLINK-25395
 URL: https://issues.apache.org/jira/browse/FLINK-25395
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Roman Khachatryan
 Fix For: 1.15.0


Extracting from [FLINK-25185 
discussion|https://issues.apache.org/jira/browse/FLINK-25185?focusedCommentId=17462639=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17462639]

On checkpoint abortion or any failure in AsyncCheckpointRunnable,
it discards the state, in particular shared (incremental) state.

Since FLINK-24611, this creates a problem because shared state can be re-used 
for future checkpoints. 

Needs confirmation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Stateful functions 3.1.1 release

2021-12-20 Thread Seth Wiesman
+1 (non-binding)

- Verified signatures
- Checked diff
- Checked site PR
- Build from source and ran e2e tests

Seth

On Mon, Dec 20, 2021 at 10:59 AM Igal Shilman  wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #2 for the version 3.1.1 of
> Apache Flink Stateful Functions, as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> This release updates the Flink version to fix the log4j CVEs
>
> **Testing Guideline**
>
> You can find here [1] a page in the project wiki on instructions for
> testing.
> To cast a vote, it is not necessary to perform all listed checks,
> but please mention which checks you have performed when voting.
>
> **Release Overview**
>
> As an overview, the release consists of the following:
> a) Stateful Functions canonical source distribution, to be deployed to the
> release repository at dist.apache.org
> b) Stateful Functions Python SDK distributions to be deployed to PyPI
> c) Maven artifacts to be deployed to the Maven Central Repository
> d) New Dockerfiles for the release
> e) GoLang SDK tag v3.1.1-rc2
>
> **Staging Areas to Review**
>
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2]
> * All artifacts for c) can be found at the Apache Nexus Repository [3]
>
> All artifacts are signed with the key
> 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4]
>
> Other links for your review:
> * JIRA release notes [5]
> * source code tag "release-3.0.0-rc1" [6]
> * PR for the new Dockerfiles [7]
> * PR for the flink website [8]
>
> **Vote Duration**
>
> The voting time will run for 24 hours. We are targeting this vote to last
> until December. 21nd, 22:00 CET.
> Or It is adopted by majority approval, with at least 3 PMC affirmative
> votes.
>
> Thanks,
> Seth & Igal
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.1-rc2/
> [3] https://repository.apache.org/content/repositories/orgapacheflink-1466
> [4] https://dist.apache.org/repos/dist/release/flink/KEYS
> [5]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096==12315522
> [6] https://github.com/apache/flink-statefun/tree/release-3.1.1-rc2
> [7] https://github.com/apache/flink-statefun-docker/pull/18
> [8] https://github.com/apache/flink-web/pull/492
>