Re: Sending watermarks into Kafka
I'm reading the Pulsar PIP and noticed another thing to take into account: multiple applications (with each a different parallelism) that all write into the same topic. On Mon, 20 Dec 2021, 10:45 Niels Basjes, wrote: > Hi Till, > > This morning I also realized what you call an 'effective watermark' is > indeed what is needed. > I'm going to read up on what Pulsar has planned. > > What I realized is that the consuming application must be aware of the > parallelism of the producing application, which is independent of the > partitions in the intermediate transport. > > Assume I produce in parallel 2 and have 5 kafka partition which I then > read in parallel 3; then in the consuming (parallel 3) application I must > wait for watermarks from each original input before I can continue: which > is 2 > Also we must assume that those watermarks are created at different > timestamps. > So my current assessment is that the watermark records must include at > least the timestamp, the number of the thread for this watermark and the > total number of threads . > > Niels > > > On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann > wrote: > >> Hi Niels, >> >> if you have multiple inputs going into a single Kafka partition then you >> have to calculate the effective watermark by looking at the min watermark >> from all inputs. You could insert a Flink operator that takes care of it >> and then writes to a set of partitions in 1:n relationship. Alternatively, >> you could take a look at Pulsar that wants to support this functionality >> out of the box [1]. >> >> [1] https://github.com/apache/pulsar/issues/12267 >> >> Cheers, >> Till >> >> On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes wrote: >> >> > Hi, >> > >> > About a year ago I spoke at the Flink Forward conference ( >> > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling >> development >> > problems regarding streaming applications and handling the lack of >> events >> > in a stream. >> > Something I spoke about towards the end of this talk was the idea to >> ship >> > the watermarks of a Flink topology into the intermediate transport >> between >> > applications so you wouldn't need to recreate them. >> > >> > At that time it was just an idea, today I'm actually trying to build >> that >> > and see if this idea is actually possible. >> > >> > So the class of applications I work on usually do a keyBy on something >> like >> > a SessionId, SensorId or IP address. >> > In low traffic scenarios this means that in Kafka some partitions are >> > completely idle which makes Windows/GroupBy type operations impossible >> (in >> > my talk I explain it a lot better). >> > >> > I have a test setup right now to play around with this and I'm running >> into >> > a bit of a conceptual hurdle for which I'm looking for help. >> > >> > My goal is to ship the watermarks from within a topology into Kafka and >> > then let a follow up application extract those watermarks again and >> simply >> > continue. >> > The new SinkWriter interface has a void writeWatermark(Watermark >> > watermark) method >> > that seems intended for this kind of thing. >> > The basic operations like writing a watermark into Kafka, reading it >> again >> > and then recreating the watermark again works in my test setup (very >> messy >> > code but it works). >> > >> > My hurdle has to do with the combination of >> > - different parallelism numbers between Flink and Kafka (how do I ship 2 >> > watermarks into 3 partitions) >> > - the fact that if you do a keyBy (both in Flink and Kafka) there is a >> > likely mismatch between the Flink 'partition' and the Kafka `partition`. >> > - processing speed differences between various threads (like session "A" >> > needs more CPU cycles/time/processing than session "B") will lead to >> > skewing of the progression between them. >> > - watermarks in separate threads in a single Flink topology are not >> > synchronized (they cannot and should not be). >> > >> > Has anyone any pointers on possible ways to handle this? >> > >> > Right now my only idea is to ship the watermark into all partitions (as >> > they do not have a key!) and let the consuming application determine the >> > "real watermark" based on the mix of watermarks coming in from the >> upstream >> > threads. >> > >> > All suggestions and ideas are appreciated. >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes >> > >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >
Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)
Hi all, Really like the discussion on this topic moving forward. I really think this feature will be much appreciated by the Flink users. What I still have left to answer/reply to: -- Good point. If for whatever reason the different taskmanagers can't get the latest rule, the Operator Coordinator could send a heartbeat to all taskmanagers with the latest rules and check the heartbeat response from all the taskmanagers whether the latest rules of the taskmanager is equal to these of the Operator Coordinator. Just to be sure, this indeed would mean that if for whatever reason the heartbeat timeout, it would crash the job, right? -- We have consided about the solution mentioned above. In this solution, I have some questions about how to guarantee the consistency of the rule between each TaskManager. By having a coodinator in the JobManager to centrally manage the latest rules, the latest rules of all TaskManagers are consistent with those of the JobManager, so as to avoid the inconsistencies that may be encountered in the above solution. Can you introduce how this solution guarantees the consistency of the rules? The consistency that we could guarantee was based on how often each TaskManager would do a refresh and how often we would accept a refresh to fail. We set the refresh time to a relatively short one (30 seconds) and maximum failures to 3. That meant that we could guarantee that rules would be updated in < 2 minutes or else the job would crash. That was sufficient for our use cases. This also really depends on how big your cluster is. I can imagine that if you have a large scale cluster that you want to run, you don't want to DDOS the backend system where you have your rules stored. -- In summary, the current design is that JobManager tells all TaskManagers the latest rules through OperatorCoodinator, and will initiate a heartbeat to check whether the latest rules on each TaskManager are consistent. We will describe how to deal with the Failover scenario in more detail on FLIP. Thanks for that. I think having the JobManager tell the TaskManagers the applicable rules would indeed end up being the best design. -- about the concerns around consistency raised by Martijn: I think a lot of those can be mitigated by using an event time timestamp from which the rules take effect. The reprocessing scenario, for example, is covered by this. If a pattern processor should become active as soon as possible, there will still be inconsistencies between Taskmanagers, but "as soon as possible" is vague anyway, which is why I think that's ok. I think an event timestamp is indeed a really important one. We also used that in my previous role, with the ruleActivationTimestamp compared to eventtime (well, actually we used Kafka logAppend time because eventtime wasn't always properly set so we used that time to overwrite the eventtime from the event itself). Best regards, Martijn On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf wrote: > Hi Nicholas, Hi Junfeng, > > about the concerns around consistency raised by Martijn: I think a lot of > those can be mitigated by using an event time timestamp from which the > rules take effect. The reprocessing scenario, for example, is covered by > this. If a pattern processor should become active as soon as possible, > there will still be inconsistencies between Taskmanagers, but "as soon as > possible" is vague anyway, which is why I think that's ok. > > about naming: The naming with "PatternProcessor" sounds good to me. Final > nit: I would go for CEP#patternProccessors, which would be consistent with > CEP#pattern. > > I am not sure about one of the rejected alternatives: > > > Have each subtask of an operator make the update on their own > >- > >It is hard to achieve consistency. >- > > Though the time interval that each subtask makes the update can be > the same, the absolute time they make the update might be different. > For > example, one makes updates at 10:00, 10:05, etc, while another does > it at > 10:01, 10:06. In this case the subtasks might never processing data > with > the same set of pattern processors. > > > I would have thought that it is quite easy to poll for the rules from each > Subtask at *about *the same time. So, this alone does not seem to be > enough to rule out this option. I've looped in David Moravek to get his > opinion of the additional load imposed on the JM. > > Thanks, > > Konstantin > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang > wrote: > > > Hi Yue, > > > > Thanks for your feedback of the FLIP. I have addressed your questions and > > made a corresponding explanation as follows: > > > > -- About Pattern Updating. If we use PatternProcessoerDiscoverer to > update > > the rules, will it increase the load of JM? For example, if the user > wants > > the updated rule to take effect immediately,, which means that we need to > > set a shorter check interval or there is another scenario when users
[jira] [Created] (FLINK-25387) Introduce ExecNodeMetadata annotation and tooling
Timo Walther created FLINK-25387: Summary: Introduce ExecNodeMetadata annotation and tooling Key: FLINK-25387 URL: https://issues.apache.org/jira/browse/FLINK-25387 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Timo Walther Introduce the annotation and possibly some utilities. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25382) Failure in "Upload Logs" task
Piotr Nowojski created FLINK-25382: -- Summary: Failure in "Upload Logs" task Key: FLINK-25382 URL: https://issues.apache.org/jira/browse/FLINK-25382 Project: Flink Issue Type: Bug Components: Test Infrastructure Affects Versions: 1.15.0 Reporter: Piotr Nowojski I don't see any error message, but it seems like uploading the logs has failed: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27568=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=bb16d35c-fdfe-5139-f244-9492cbd2050b for the following build: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27568=logs=2c3cbe13-dee0-5837-cf47-3053da9a8a78=2c7d57b9-7341-5a87-c9af-2cf7cc1a37dc -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25384) Job can not migrate from flink-1.12 to flink-1.13
tanjialiang created FLINK-25384: --- Summary: Job can not migrate from flink-1.12 to flink-1.13 Key: FLINK-25384 URL: https://issues.apache.org/jira/browse/FLINK-25384 Project: Flink Issue Type: Technical Debt Components: Table SQL / Planner Affects Versions: 1.14.0, 1.13.0 Reporter: tanjialiang The NotNullEnforcer is always add in the Flink-1.12. But in this [PR|https://issues.apache.org/jira/browse/FLINK-21005], it make a IF branch to make a decision whether the NotNullEnforcer to add. So it make the StreamGraph change in flink-1.13. So i think this is the reason i can't migrate the flink sql job from flink-1.12 to flink-1.13. :( Solution:make a table sink's column to be NOT NULL, so that it can add a NotNullEnforcer operator in to the stream graph, that will work in restore from savepoint between flink-1.12 to flink-1.13. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25383) Skip waiting for the final checkpoint if the task do not contains 2-pc operators
Yun Gao created FLINK-25383: --- Summary: Skip waiting for the final checkpoint if the task do not contains 2-pc operators Key: FLINK-25383 URL: https://issues.apache.org/jira/browse/FLINK-25383 Project: Flink Issue Type: Sub-task Components: Runtime / Checkpointing Reporter: Yun Gao -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25386) Harden table serialization in JSON plan
Timo Walther created FLINK-25386: Summary: Harden table serialization in JSON plan Key: FLINK-25386 URL: https://issues.apache.org/jira/browse/FLINK-25386 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Timo Walther Similar to previous subtasks, we should revisit the JSON plan according to FLIP-190: - Consider config option regarding catalog objects - Helpful exceptions for the unsupported cases -- This message was sent by Atlassian Jira (v8.20.1#820001)
[DISCUSS] FLIP-203: Incremental savepoints
Hi devs, I would like to start a discussion about a previously announced follow up of the FLIP-193 [1], namely allowing savepoints to be in native format and incremental. The changes do not seem invasive. The full proposal is written down as FLIP-203: Incremental savepoints [2]. Please take a look, and let me know what you think. Best, Piotrek [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Semantic
[jira] [Created] (FLINK-25393) Make ConfigMap Name for Leader Election Configurable
Konstantin Knauf created FLINK-25393: Summary: Make ConfigMap Name for Leader Election Configurable Key: FLINK-25393 URL: https://issues.apache.org/jira/browse/FLINK-25393 Project: Flink Issue Type: Improvement Components: Deployment / Kubernetes Reporter: Konstantin Knauf When deploying Flink on Kubernetes (standalone; application mode; KubernetesHighAvailabilityServices), I would like to configure the name of the ConfigMaps used for leader election via the flink-conf.yaml. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: Sending watermarks into Kafka
Hi Till, This morning I also realized what you call an 'effective watermark' is indeed what is needed. I'm going to read up on what Pulsar has planned. What I realized is that the consuming application must be aware of the parallelism of the producing application, which is independent of the partitions in the intermediate transport. Assume I produce in parallel 2 and have 5 kafka partition which I then read in parallel 3; then in the consuming (parallel 3) application I must wait for watermarks from each original input before I can continue: which is 2 Also we must assume that those watermarks are created at different timestamps. So my current assessment is that the watermark records must include at least the timestamp, the number of the thread for this watermark and the total number of threads . Niels On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann wrote: > Hi Niels, > > if you have multiple inputs going into a single Kafka partition then you > have to calculate the effective watermark by looking at the min watermark > from all inputs. You could insert a Flink operator that takes care of it > and then writes to a set of partitions in 1:n relationship. Alternatively, > you could take a look at Pulsar that wants to support this functionality > out of the box [1]. > > [1] https://github.com/apache/pulsar/issues/12267 > > Cheers, > Till > > On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes wrote: > > > Hi, > > > > About a year ago I spoke at the Flink Forward conference ( > > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development > > problems regarding streaming applications and handling the lack of events > > in a stream. > > Something I spoke about towards the end of this talk was the idea to ship > > the watermarks of a Flink topology into the intermediate transport > between > > applications so you wouldn't need to recreate them. > > > > At that time it was just an idea, today I'm actually trying to build that > > and see if this idea is actually possible. > > > > So the class of applications I work on usually do a keyBy on something > like > > a SessionId, SensorId or IP address. > > In low traffic scenarios this means that in Kafka some partitions are > > completely idle which makes Windows/GroupBy type operations impossible > (in > > my talk I explain it a lot better). > > > > I have a test setup right now to play around with this and I'm running > into > > a bit of a conceptual hurdle for which I'm looking for help. > > > > My goal is to ship the watermarks from within a topology into Kafka and > > then let a follow up application extract those watermarks again and > simply > > continue. > > The new SinkWriter interface has a void writeWatermark(Watermark > > watermark) method > > that seems intended for this kind of thing. > > The basic operations like writing a watermark into Kafka, reading it > again > > and then recreating the watermark again works in my test setup (very > messy > > code but it works). > > > > My hurdle has to do with the combination of > > - different parallelism numbers between Flink and Kafka (how do I ship 2 > > watermarks into 3 partitions) > > - the fact that if you do a keyBy (both in Flink and Kafka) there is a > > likely mismatch between the Flink 'partition' and the Kafka `partition`. > > - processing speed differences between various threads (like session "A" > > needs more CPU cycles/time/processing than session "B") will lead to > > skewing of the progression between them. > > - watermarks in separate threads in a single Flink topology are not > > synchronized (they cannot and should not be). > > > > Has anyone any pointers on possible ways to handle this? > > > > Right now my only idea is to ship the watermark into all partitions (as > > they do not have a key!) and let the consuming application determine the > > "real watermark" based on the mix of watermarks coming in from the > upstream > > threads. > > > > All suggestions and ideas are appreciated. > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > > > -- Best regards / Met vriendelijke groeten, Niels Basjes
[RESULT][VOTE] FLIP-190: Support Version Upgrades for Table API & SQL Programs
Hi everyone, The voting time for FLIP-190: Support Version Upgrades for Table API & SQL Programs[1] has passed. I'm closing the vote now. There were 6 +1 votes, 3 of which are binding: - Jing Zhang (binding) - Ingo Bürk (binding) - Godfrey He (binding) - Martijn Visser (non-binding) - Wenlong (non-binding) - Sergey Nuyanzin (non-binding) There were no -1 votes. Thus, FLIP-190 has been accepted. Thanks everyone for joining the discussion and giving feedback! [1] https://lists.apache.org/thread/rxxxq0q9v9pmzd2ht9nybpg5vrzyhwx7 Cheers, Timo On 16.12.21 12:29, Sergey Nuyanzin wrote: +1 (non-binding) On Wed, Dec 15, 2021 at 11:22 AM godfrey he wrote: +1 (binding) Best, Godfrey Ingo Bürk 于2021年12月15日周三 16:19写道: +1 (binding) Thanks for driving this much needed feature! On 14.12.21 17:45, Timo Walther wrote: Hi everyone, I'd like to start a vote on FLIP-190: Support Version Upgrades for Table API & SQL Programs [1] which has been discussed in this thread [2]. The vote will be open for at least 72 hours unless there is an objection or not enough votes. [1] https://cwiki.apache.org/confluence/x/KZBnCw [2] https://lists.apache.org/thread/n8v32j6o3d50mpblxydbz82q1q436ob4 Cheers, Timo
[jira] [Created] (FLINK-25391) Updating existing table factories for mutable table options
Timo Walther created FLINK-25391: Summary: Updating existing table factories for mutable table options Key: FLINK-25391 URL: https://issues.apache.org/jira/browse/FLINK-25391 Project: Flink Issue Type: Sub-task Components: Connectors / Common Reporter: Timo Walther Update all existing factories for FLINK-25390. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25385) Harden function serialization in JSON plan
Timo Walther created FLINK-25385: Summary: Harden function serialization in JSON plan Key: FLINK-25385 URL: https://issues.apache.org/jira/browse/FLINK-25385 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Timo Walther Similar to FLINK-25230, we should revisit how functions are serialized into the JSON plan. - No legacy in plan - No Java serialization in plan - Consider config option regarding catalog objects - Helpful exceptions for the unsupported cases - Use function version 1 for all functions as a first step -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25389) Update JSON plan with new ExecNodeMetadata
Timo Walther created FLINK-25389: Summary: Update JSON plan with new ExecNodeMetadata Key: FLINK-25389 URL: https://issues.apache.org/jira/browse/FLINK-25389 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Timo Walther This is the first time that we will actually use the annotations. - Look up by name and version - Helpful exceptions for the unsupported cases -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25388) Add annotation to all StreamExec nodes
Timo Walther created FLINK-25388: Summary: Add annotation to all StreamExec nodes Key: FLINK-25388 URL: https://issues.apache.org/jira/browse/FLINK-25388 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Timo Walther Add the initial version. Add proper declaration of consumed options etc. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25390) Support mutable options in DynamicTableFactory
Timo Walther created FLINK-25390: Summary: Support mutable options in DynamicTableFactory Key: FLINK-25390 URL: https://issues.apache.org/jira/browse/FLINK-25390 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther Factory update for mutable options. {code} DynamicTableFactory { /* Declares options that can be modified without runtime implications. */ Set> mutableOptions(); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25392) Add new STATEMENT SET syntax
Timo Walther created FLINK-25392: Summary: Add new STATEMENT SET syntax Key: FLINK-25392 URL: https://issues.apache.org/jira/browse/FLINK-25392 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther Support: {code} EXECUTE STATEMENT SET BEGIN INSERT INTO pageview_pv_sink SELECT page_id, count(1) FROM clicks GROUP BY page_id; INSERT INTO pageview_uv_sink SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id; END; {code} {code} EXPLAIN STATEMENT SET BEGIN INSERT INTO pageview_pv_sink SELECT page_id, count(1) FROM clicks GROUP BY page_id; INSERT INTO pageview_uv_sink SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id; END; {code} This time we should add this to the SQL parser. We need to figure out a solution for the interactive SQL Client. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: Sending watermarks into Kafka
Hi Niels, if you have multiple inputs going into a single Kafka partition then you have to calculate the effective watermark by looking at the min watermark from all inputs. You could insert a Flink operator that takes care of it and then writes to a set of partitions in 1:n relationship. Alternatively, you could take a look at Pulsar that wants to support this functionality out of the box [1]. [1] https://github.com/apache/pulsar/issues/12267 Cheers, Till On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes wrote: > Hi, > > About a year ago I spoke at the Flink Forward conference ( > https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development > problems regarding streaming applications and handling the lack of events > in a stream. > Something I spoke about towards the end of this talk was the idea to ship > the watermarks of a Flink topology into the intermediate transport between > applications so you wouldn't need to recreate them. > > At that time it was just an idea, today I'm actually trying to build that > and see if this idea is actually possible. > > So the class of applications I work on usually do a keyBy on something like > a SessionId, SensorId or IP address. > In low traffic scenarios this means that in Kafka some partitions are > completely idle which makes Windows/GroupBy type operations impossible (in > my talk I explain it a lot better). > > I have a test setup right now to play around with this and I'm running into > a bit of a conceptual hurdle for which I'm looking for help. > > My goal is to ship the watermarks from within a topology into Kafka and > then let a follow up application extract those watermarks again and simply > continue. > The new SinkWriter interface has a void writeWatermark(Watermark > watermark) method > that seems intended for this kind of thing. > The basic operations like writing a watermark into Kafka, reading it again > and then recreating the watermark again works in my test setup (very messy > code but it works). > > My hurdle has to do with the combination of > - different parallelism numbers between Flink and Kafka (how do I ship 2 > watermarks into 3 partitions) > - the fact that if you do a keyBy (both in Flink and Kafka) there is a > likely mismatch between the Flink 'partition' and the Kafka `partition`. > - processing speed differences between various threads (like session "A" > needs more CPU cycles/time/processing than session "B") will lead to > skewing of the progression between them. > - watermarks in separate threads in a single Flink topology are not > synchronized (they cannot and should not be). > > Has anyone any pointers on possible ways to handle this? > > Right now my only idea is to ship the watermark into all partitions (as > they do not have a key!) and let the consuming application determine the > "real watermark" based on the mix of watermarks coming in from the upstream > threads. > > All suggestions and ideas are appreciated. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >
Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)
Hi Nicholas, Hi Junfeng, about the concerns around consistency raised by Martijn: I think a lot of those can be mitigated by using an event time timestamp from which the rules take effect. The reprocessing scenario, for example, is covered by this. If a pattern processor should become active as soon as possible, there will still be inconsistencies between Taskmanagers, but "as soon as possible" is vague anyway, which is why I think that's ok. about naming: The naming with "PatternProcessor" sounds good to me. Final nit: I would go for CEP#patternProccessors, which would be consistent with CEP#pattern. I am not sure about one of the rejected alternatives: > Have each subtask of an operator make the update on their own - It is hard to achieve consistency. - Though the time interval that each subtask makes the update can be the same, the absolute time they make the update might be different. For example, one makes updates at 10:00, 10:05, etc, while another does it at 10:01, 10:06. In this case the subtasks might never processing data with the same set of pattern processors. I would have thought that it is quite easy to poll for the rules from each Subtask at *about *the same time. So, this alone does not seem to be enough to rule out this option. I've looped in David Moravek to get his opinion of the additional load imposed on the JM. Thanks, Konstantin On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang wrote: > Hi Yue, > > Thanks for your feedback of the FLIP. I have addressed your questions and > made a corresponding explanation as follows: > > -- About Pattern Updating. If we use PatternProcessoerDiscoverer to update > the rules, will it increase the load of JM? For example, if the user wants > the updated rule to take effect immediately,, which means that we need to > set a shorter check interval or there is another scenario when users rarely > update the pattern, will the PatternProcessoerDiscoverer be in most of the > time Do useless checks ? Will a lazy update mode could be used, which the > pattern only be updated when triggered by the user, and do nothing at other > times? > > PatternProcessoerDiscoverer is a user-defined interface to discover the > PatternProcessor updates. Periodically checking the PatternProcessor in the > database is a implementation of the PatternProcessoerDiscoverer interface, > which is that periodically querys all the PatternProcessor table in certain > interval. This implementation indeeds has the useless checks, and could > directly integrates the changelog of the table. In addition, in addition to > the implementation of periodically checking the database, there are other > implementations such as the PatternProcessor that provides Restful services > to receive updates. > > -- I still have some confusion about how Key Generating Opertator and > CepOperator (Pattern Matching & Processing Operator) work together. If > there are N PatternProcessors, will the Key Generating Opertator generate N > keyedStreams, and then N CepOperator would process each Key separately ? Or > every CepOperator Task would process all patterns, if so, does the key type > in each PatternProcessor need to be the same? > > Firstly the Pattern Matching & Processing Operator is not the CepOperator > at present, because CepOperator mechanism is based on the NFAState. > Secondly if there are N PatternProcessors, the Key Generating Opertator > combines all the keyedStreams with keyBy() operation, thus the Pattern > Matching & Processing Operator would process all the patterns. In other > words, the KeySelector of the PatternProcessor is used for the Key > Generating Opertator, and the Pattern and PatternProceessFunction of the > PatternProcessor are used for the Pattern Matching & Processing Operator. > Lastly the key type in each PatternProcessor is the same, regarded as > Object type. > > -- Maybe need to pay attention to it when implementing it .If some Pattern > has been removed or updated, will the partially matched results in > StateBackend would be clean up or We rely on state ttl to clean up these > expired states. > > If certain Pattern has been removed or updated, the partially matched > results in StateBackend would be clean up until the next checkpoint. The > partially matched result doesn't depend on the state ttl of the > StateBackend. > > 4. Will the PatternProcessorManager keep all the active PatternProcessor > in memory? We have also Support Multiple Rule and Dynamic Rule Changing. > But we are facing such a problem, some users’ usage scenarios are that they > want to have their own pattern for each user_id, which means that there > could be thousands of patterns, which would make the performance of Pattern > Matching very poor. We are also trying to solve this problem. > > The PatternProcessorManager keeps all the active PatternProcessor in > memory. For scenarios that they want to have their own pattern for each > user_id, IMO, is it
[jira] [Created] (FLINK-25381) JdbcExactlyOnceSinkE2eTest.testInsert fails on AZP
Till Rohrmann created FLINK-25381: - Summary: JdbcExactlyOnceSinkE2eTest.testInsert fails on AZP Key: FLINK-25381 URL: https://issues.apache.org/jira/browse/FLINK-25381 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.15.0 Reporter: Till Rohrmann Fix For: 1.15.0 The test {{JdbcExactlyOnceSinkE2eTest.testInsert}} fails on AZP with {code} 2021-12-20T03:13:06.5415073Z Dec 20 03:13:06 org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 2021-12-20T03:13:06.5415676Z Dec 20 03:13:06at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 2021-12-20T03:13:06.5416393Z Dec 20 03:13:06at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) 2021-12-20T03:13:06.5417081Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 2021-12-20T03:13:06.5417686Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) 2021-12-20T03:13:06.5418283Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 2021-12-20T03:13:06.5418892Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 2021-12-20T03:13:06.5424332Z Dec 20 03:13:06at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) 2021-12-20T03:13:06.5425255Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) 2021-12-20T03:13:06.5425994Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) 2021-12-20T03:13:06.5426730Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 2021-12-20T03:13:06.5427397Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 2021-12-20T03:13:06.5428007Z Dec 20 03:13:06at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) 2021-12-20T03:13:06.5428649Z Dec 20 03:13:06at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) 2021-12-20T03:13:06.5429604Z Dec 20 03:13:06at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) 2021-12-20T03:13:06.5430674Z Dec 20 03:13:06at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) 2021-12-20T03:13:06.5431696Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) 2021-12-20T03:13:06.5432744Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) 2021-12-20T03:13:06.5433645Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 2021-12-20T03:13:06.5434697Z Dec 20 03:13:06at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 2021-12-20T03:13:06.5435356Z Dec 20 03:13:06at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) 2021-12-20T03:13:06.5436145Z Dec 20 03:13:06at akka.dispatch.OnComplete.internal(Future.scala:300) 2021-12-20T03:13:06.5436855Z Dec 20 03:13:06at akka.dispatch.OnComplete.internal(Future.scala:297) 2021-12-20T03:13:06.5437690Z Dec 20 03:13:06at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) 2021-12-20T03:13:06.5438367Z Dec 20 03:13:06at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) 2021-12-20T03:13:06.5439147Z Dec 20 03:13:06at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) 2021-12-20T03:13:06.5440092Z Dec 20 03:13:06at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) 2021-12-20T03:13:06.5441108Z Dec 20 03:13:06at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) 2021-12-20T03:13:06.5442086Z Dec 20 03:13:06at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) 2021-12-20T03:13:06.5443269Z Dec 20 03:13:06at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) 2021-12-20T03:13:06.5444506Z Dec 20 03:13:06at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) 2021-12-20T03:13:06.5445078Z Dec 20 03:13:06at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) 2021-12-20T03:13:06.5445682Z Dec 20 03:13:06at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
Re: [jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackso
I just wanted to check in on this, is there any interest in a PR to fix this issue? Looking at the master branch, it still looks to be a problem. Thanks. On Mon, Dec 6, 2021 at 9:52 AM Galen Warren (Jira) wrote: > Galen Warren created FLINK-25197: > > > Summary: Using Statefun RequestReplyFunctionBuilder fails > with Java 8 date/time type `java.time.Duration` not supported by default: > add Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > Key: FLINK-25197 > URL: https://issues.apache.org/jira/browse/FLINK-25197 > Project: Flink > Issue Type: Bug > Components: Stateful Functions > Affects Versions: statefun-3.1.0 > Reporter: Galen Warren > Fix For: statefun-3.1.0 > > > When using RequestReplyFunctionBuilder to build a stateful functions job, > the job fails at runtime with: > > Java 8 date/time type `java.time.Duration` not supported by default: add > Module > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > to enable handling > > It appears this is because, in > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode| > https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127], > a default instance of ObjectMapper is used to serialize the client > properties, which now include a java.time.Duration. There is a > [StateFunObjectMapper| > https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java] > class in the project that has customized serde support, but it is not used > here. > > The fix seems to be to: > * Use an instance of StateFunObjectMapper to serialize the client > properties in RequestReplyFunctionBuilder > * Modify StateFunObjecdtMapper to both serialize and deserialize > instances of java.time.Duration (currently, only deserialization is > supported) > > I've made these changes locally and it seems to fix the problem. Would you > be interested in a PR? Thanks. > > > > > > -- > This message was sent by Atlassian Jira > (v8.20.1#820001) >
[VOTE] Stateful functions 3.1.1 release
Hi everyone, Please review and vote on the release candidate #2 for the version 3.1.1 of Apache Flink Stateful Functions, as follows: [ ] +1, Approve the release [ ] -1, Do not approve the release (please provide specific comments) This release updates the Flink version to fix the log4j CVEs **Testing Guideline** You can find here [1] a page in the project wiki on instructions for testing. To cast a vote, it is not necessary to perform all listed checks, but please mention which checks you have performed when voting. **Release Overview** As an overview, the release consists of the following: a) Stateful Functions canonical source distribution, to be deployed to the release repository at dist.apache.org b) Stateful Functions Python SDK distributions to be deployed to PyPI c) Maven artifacts to be deployed to the Maven Central Repository d) New Dockerfiles for the release e) GoLang SDK tag v3.1.1-rc2 **Staging Areas to Review** The staging areas containing the above mentioned artifacts are as follows, for your review: * All artifacts for a) and b) can be found in the corresponding dev repository at dist.apache.org [2] * All artifacts for c) can be found at the Apache Nexus Repository [3] All artifacts are signed with the key 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4] Other links for your review: * JIRA release notes [5] * source code tag "release-3.0.0-rc1" [6] * PR for the new Dockerfiles [7] * PR for the flink website [8] **Vote Duration** The voting time will run for 24 hours. We are targeting this vote to last until December. 21nd, 22:00 CET. Or It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Seth & Igal [1] https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release [2] https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.1-rc2/ [3] https://repository.apache.org/content/repositories/orgapacheflink-1466 [4] https://dist.apache.org/repos/dist/release/flink/KEYS [5] https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096==12315522 [6] https://github.com/apache/flink-statefun/tree/release-3.1.1-rc2 [7] https://github.com/apache/flink-statefun-docker/pull/18 [8] https://github.com/apache/flink-web/pull/492
Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)
Hi Konstantin, Martijn Thanks for the detailed feedback in the discussion. What I still have left to answer/reply to: -- Martijn: Just to be sure, this indeed would mean that if for whatever reason the heartbeat timeout, it would crash the job, right? IMO, if for whatever reason the heartbeat timeout, it couldn't check the PatternProcessor consistency between the OperatorCoordinator and the subtasks so that the job would be crashed. -- Konstantin: What I was concerned about is that we basically let users run a UserFunction in the OperatorCoordinator, which it does not seem to have been designed for. In general, we have reached an agreement on the design of this FLIP, but there are some concerns on the OperatorCoordinator, about whether basically let users run a UserFunction in the OperatorCoordinator is designed for OperatorCoordinator. We would like to invite Becket Qin who is the author of OperatorCoordinator to help us to answer this concern. Best, Nicholas Jiang On 2021/12/20 10:07:14 Martijn Visser wrote: > Hi all, > > Really like the discussion on this topic moving forward. I really think > this feature will be much appreciated by the Flink users. What I still have > left to answer/reply to: > > -- Good point. If for whatever reason the different taskmanagers can't get > the latest rule, the Operator Coordinator could send a heartbeat to all > taskmanagers with the latest rules and check the heartbeat response from > all the taskmanagers whether the latest rules of the taskmanager is equal > to these of the Operator Coordinator. > > Just to be sure, this indeed would mean that if for whatever reason the > heartbeat timeout, it would crash the job, right? > > -- We have consided about the solution mentioned above. In this solution, I > have some questions about how to guarantee the consistency of the rule > between each TaskManager. By having a coodinator in the JobManager to > centrally manage the latest rules, the latest rules of all TaskManagers are > consistent with those of the JobManager, so as to avoid the inconsistencies > that may be encountered in the above solution. Can you introduce how this > solution guarantees the consistency of the rules? > > The consistency that we could guarantee was based on how often each > TaskManager would do a refresh and how often we would accept a refresh to > fail. We set the refresh time to a relatively short one (30 seconds) and > maximum failures to 3. That meant that we could guarantee that rules would > be updated in < 2 minutes or else the job would crash. That was sufficient > for our use cases. This also really depends on how big your cluster is. I > can imagine that if you have a large scale cluster that you want to run, > you don't want to DDOS the backend system where you have your rules stored. > > -- In summary, the current design is that JobManager tells all TaskManagers > the latest rules through OperatorCoodinator, and will initiate a heartbeat > to check whether the latest rules on each TaskManager are consistent. We > will describe how to deal with the Failover scenario in more detail on FLIP. > > Thanks for that. I think having the JobManager tell the TaskManagers the > applicable rules would indeed end up being the best design. > > -- about the concerns around consistency raised by Martijn: I think a lot > of those can be mitigated by using an event time timestamp from which the > rules take effect. The reprocessing scenario, for example, is covered by > this. If a pattern processor should become active as soon as possible, > there will still be inconsistencies between Taskmanagers, but "as soon as > possible" is vague anyway, which is why I think that's ok. > > I think an event timestamp is indeed a really important one. We also used > that in my previous role, with the ruleActivationTimestamp compared to > eventtime (well, actually we used Kafka logAppend time because > eventtime wasn't always properly set so we used that time to overwrite the > eventtime from the event itself). > > Best regards, > > Martijn > > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf wrote: > > > Hi Nicholas, Hi Junfeng, > > > > about the concerns around consistency raised by Martijn: I think a lot of > > those can be mitigated by using an event time timestamp from which the > > rules take effect. The reprocessing scenario, for example, is covered by > > this. If a pattern processor should become active as soon as possible, > > there will still be inconsistencies between Taskmanagers, but "as soon as > > possible" is vague anyway, which is why I think that's ok. > > > > about naming: The naming with "PatternProcessor" sounds good to me. Final > > nit: I would go for CEP#patternProccessors, which would be consistent with > > CEP#pattern. > > > > I am not sure about one of the rejected alternatives: > > > > > Have each subtask of an operator make the update on their own > > > >- > > > >It is hard to achieve consistency. > >
[jira] [Created] (FLINK-25398) Show complete stacktrace when requesting thread dump
Junfan Zhang created FLINK-25398: Summary: Show complete stacktrace when requesting thread dump Key: FLINK-25398 URL: https://issues.apache.org/jira/browse/FLINK-25398 Project: Flink Issue Type: Improvement Reporter: Junfan Zhang -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [VOTE] Stateful functions 3.1.1 release
+1 (binding) - Checked hash and signatures - Checked diff contains Flink upgrade - mvn clean install with e2e Thanks, Gordon On Mon, Dec 20, 2021, 13:55 Seth Wiesman wrote: > +1 (non-binding) > > - Verified signatures > - Checked diff > - Checked site PR > - Build from source and ran e2e tests > > Seth > > On Mon, Dec 20, 2021 at 10:59 AM Igal Shilman wrote: > > > Hi everyone, > > > > Please review and vote on the release candidate #2 for the version 3.1.1 > of > > Apache Flink Stateful Functions, as follows: > > [ ] +1, Approve the release > > [ ] -1, Do not approve the release (please provide specific comments) > > > > This release updates the Flink version to fix the log4j CVEs > > > > **Testing Guideline** > > > > You can find here [1] a page in the project wiki on instructions for > > testing. > > To cast a vote, it is not necessary to perform all listed checks, > > but please mention which checks you have performed when voting. > > > > **Release Overview** > > > > As an overview, the release consists of the following: > > a) Stateful Functions canonical source distribution, to be deployed to > the > > release repository at dist.apache.org > > b) Stateful Functions Python SDK distributions to be deployed to PyPI > > c) Maven artifacts to be deployed to the Maven Central Repository > > d) New Dockerfiles for the release > > e) GoLang SDK tag v3.1.1-rc2 > > > > **Staging Areas to Review** > > > > The staging areas containing the above mentioned artifacts are as > follows, > > for your review: > > * All artifacts for a) and b) can be found in the corresponding dev > > repository at dist.apache.org [2] > > * All artifacts for c) can be found at the Apache Nexus Repository [3] > > > > All artifacts are signed with the key > > 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4] > > > > Other links for your review: > > * JIRA release notes [5] > > * source code tag "release-3.0.0-rc1" [6] > > * PR for the new Dockerfiles [7] > > * PR for the flink website [8] > > > > **Vote Duration** > > > > The voting time will run for 24 hours. We are targeting this vote to last > > until December. 21nd, 22:00 CET. > > Or It is adopted by majority approval, with at least 3 PMC affirmative > > votes. > > > > Thanks, > > Seth & Igal > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release > > [2] > https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.1-rc2/ > > [3] > https://repository.apache.org/content/repositories/orgapacheflink-1466 > > [4] https://dist.apache.org/repos/dist/release/flink/KEYS > > [5] > > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096==12315522 > > [6] https://github.com/apache/flink-statefun/tree/release-3.1.1-rc2 > > [7] https://github.com/apache/flink-statefun-docker/pull/18 > > [8] https://github.com/apache/flink-web/pull/492 > > >
[jira] [Created] (FLINK-25396) lookupjoin source table for pre-partitioning
HunterHunter created FLINK-25396: Summary: lookupjoin source table for pre-partitioning Key: FLINK-25396 URL: https://issues.apache.org/jira/browse/FLINK-25396 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: HunterHunter When we perform external associations, we need to partition by key first, so that the same key is in a task, which can reduce the number of queries and make the external data cached by each task more scattered rather than full Example:select * from sourceTable t1 LEFT JOIN lookuptable FOR SYSTEM_TIME AS OF t1.proctime as t2 ON t1.msg = t2.word Execution Plan like: {code:java} == Optimized Execution Plan == Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) AS proctime, word]) +- LookupJoin(table=[default_catalog.default_database.hbaselookup], joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, offset, rowtime, msg, uid, proctime, word]) +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, test, watermark=[-($0, 1:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, offset]) {code} After I made the optimization, I added a hint configuration(lookup.join.pre-partition) and added a rule to generate an exchange. so that I can pre-partition by the join key when obtaining external data synchronously select * from test t1 LEFT JOIN hbaselookup /*+ OPTIONS('lookup.join.pre-partition'='true') */ FOR SYSTEM_TIME AS OF t1.proctime as t2 ON t1.msg = t2.word {code:java} == Optimized Execution Plan == Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) AS proctime, word]) +- LookupJoin(table=[default_catalog.default_database.hbaselookup], joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, offset, rowtime, msg, uid, proctime, word]) +- Exchange(distribution=[hash[msg]]) +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, test, watermark=[-($0, 1:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, offset]) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25397) grouped_execution
ZhuoYu Chen created FLINK-25397: --- Summary: grouped_execution Key: FLINK-25397 URL: https://issues.apache.org/jira/browse/FLINK-25397 Project: Flink Issue Type: Improvement Components: Table SQL / Legacy Planner, Table SQL / Planner, Table SQL / Runtime Affects Versions: 1.15.0 Reporter: ZhuoYu Chen Performing data bucketing execution: two tables (orders, orders_item), divided into buckets (bucketing) based on the same fields (orderid) and the same number of buckets. In join by order id, join and aggregation calculations can be performed independently, because the same order ids of both tables are divided into buckets with the same ids. This has several advantages. 1. Whenever a bucket of data is computed, the memory occupied by this bucket can be released immediately, so memory consumption can be limited by controlling the number of buckets processed in parallel. 2. reduces a lot of shuffling -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackso
Hi Galen, Sorry for the late reply, a lot of people are on sporadic schedules with the holidays. A PR would be very welcome! I've gone ahead and assigned you the Jira. And I'll watch the repo to review your pr. cheers, Seth On Mon, Dec 20, 2021 at 8:24 AM Galen Warren wrote: > I just wanted to check in on this, is there any interest in a PR to fix > this issue? Looking at the master branch, it still looks to be a problem. > Thanks. > > On Mon, Dec 6, 2021 at 9:52 AM Galen Warren (Jira) > wrote: > > > Galen Warren created FLINK-25197: > > > > > > Summary: Using Statefun RequestReplyFunctionBuilder fails > > with Java 8 date/time type `java.time.Duration` not supported by default: > > add Module > > > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > > to enable handling > > Key: FLINK-25197 > > URL: https://issues.apache.org/jira/browse/FLINK-25197 > > Project: Flink > > Issue Type: Bug > > Components: Stateful Functions > > Affects Versions: statefun-3.1.0 > > Reporter: Galen Warren > > Fix For: statefun-3.1.0 > > > > > > When using RequestReplyFunctionBuilder to build a stateful functions job, > > the job fails at runtime with: > > > > Java 8 date/time type `java.time.Duration` not supported by default: add > > Module > > > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > > to enable handling > > > > It appears this is because, in > > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode| > > > https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127 > ], > > a default instance of ObjectMapper is used to serialize the client > > properties, which now include a java.time.Duration. There is a > > [StateFunObjectMapper| > > > https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java > ] > > class in the project that has customized serde support, but it is not > used > > here. > > > > The fix seems to be to: > > * Use an instance of StateFunObjectMapper to serialize the client > > properties in RequestReplyFunctionBuilder > > * Modify StateFunObjecdtMapper to both serialize and deserialize > > instances of java.time.Duration (currently, only deserialization is > > supported) > > > > I've made these changes locally and it seems to fix the problem. Would > you > > be interested in a PR? Thanks. > > > > > > > > > > > > -- > > This message was sent by Atlassian Jira > > (v8.20.1#820001) > > >
Re: [DISCUSS] FLIP-203: Incremental savepoints
Hi Piotr, Thanks a lot for starting the discussion. Big +1. In my understanding, this FLIP introduces the snapshot format as a *really* user facing concept. IMO it is important that we document a) that it is not longer the checkpoint/savepoint characteristics that determines the kind of changes that a snapshots allows (user code, state schema evolution, topology changes), but now this becomes a property of the format regardless of whether this is a snapshots or a checkpoint b) the exact changes that each format allows (code, state schema, topology, state backend, max parallelism) In this context: will the native format support state schema evolution? If not, I am not sure, we can let the format default to native. Thanks, Konstantin On Mon, Dec 20, 2021 at 2:09 PM Piotr Nowojski wrote: > Hi devs, > > I would like to start a discussion about a previously announced follow up > of the FLIP-193 [1], namely allowing savepoints to be in native format and > incremental. The changes do not seem invasive. The full proposal is > written down as FLIP-203: Incremental savepoints [2]. Please take a look, > and let me know what you think. > > Best, > Piotrek > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-203%3A+Incremental+savepoints#FLIP203:Incrementalsavepoints-Semantic > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk
Re: Sending watermarks into Kafka
I think this problem should be tackled inside Kafka, not Flink. Kafka already has internal control messages to write transaction markers. Those could be extended to carry watermark information. It would be best to generalize those as "user control messages" and watermarks could just be one application. In addition, we might need something link a "producer group" to track how many producers are writing into a partition: this would allow to inform downstream consumer how many different watermarks they need to track. It's not an easy problem to solve, but without integrating with the storage layer, but trying to solve it at the processing layer, it's even harder. -Matthias On 12/20/21 01:57, Niels Basjes wrote: I'm reading the Pulsar PIP and noticed another thing to take into account: multiple applications (with each a different parallelism) that all write into the same topic. On Mon, 20 Dec 2021, 10:45 Niels Basjes, wrote: Hi Till, This morning I also realized what you call an 'effective watermark' is indeed what is needed. I'm going to read up on what Pulsar has planned. What I realized is that the consuming application must be aware of the parallelism of the producing application, which is independent of the partitions in the intermediate transport. Assume I produce in parallel 2 and have 5 kafka partition which I then read in parallel 3; then in the consuming (parallel 3) application I must wait for watermarks from each original input before I can continue: which is 2 Also we must assume that those watermarks are created at different timestamps. So my current assessment is that the watermark records must include at least the timestamp, the number of the thread for this watermark and the total number of threads . Niels On Mon, Dec 20, 2021 at 10:10 AM Till Rohrmann wrote: Hi Niels, if you have multiple inputs going into a single Kafka partition then you have to calculate the effective watermark by looking at the min watermark from all inputs. You could insert a Flink operator that takes care of it and then writes to a set of partitions in 1:n relationship. Alternatively, you could take a look at Pulsar that wants to support this functionality out of the box [1]. [1] https://github.com/apache/pulsar/issues/12267 Cheers, Till On Sun, Dec 19, 2021 at 4:46 PM Niels Basjes wrote: Hi, About a year ago I spoke at the Flink Forward conference ( https://www.youtube.com/watch?v=wqRDyrE3dwg ) about handling development problems regarding streaming applications and handling the lack of events in a stream. Something I spoke about towards the end of this talk was the idea to ship the watermarks of a Flink topology into the intermediate transport between applications so you wouldn't need to recreate them. At that time it was just an idea, today I'm actually trying to build that and see if this idea is actually possible. So the class of applications I work on usually do a keyBy on something like a SessionId, SensorId or IP address. In low traffic scenarios this means that in Kafka some partitions are completely idle which makes Windows/GroupBy type operations impossible (in my talk I explain it a lot better). I have a test setup right now to play around with this and I'm running into a bit of a conceptual hurdle for which I'm looking for help. My goal is to ship the watermarks from within a topology into Kafka and then let a follow up application extract those watermarks again and simply continue. The new SinkWriter interface has a void writeWatermark(Watermark watermark) method that seems intended for this kind of thing. The basic operations like writing a watermark into Kafka, reading it again and then recreating the watermark again works in my test setup (very messy code but it works). My hurdle has to do with the combination of - different parallelism numbers between Flink and Kafka (how do I ship 2 watermarks into 3 partitions) - the fact that if you do a keyBy (both in Flink and Kafka) there is a likely mismatch between the Flink 'partition' and the Kafka `partition`. - processing speed differences between various threads (like session "A" needs more CPU cycles/time/processing than session "B") will lead to skewing of the progression between them. - watermarks in separate threads in a single Flink topology are not synchronized (they cannot and should not be). Has anyone any pointers on possible ways to handle this? Right now my only idea is to ship the watermark into all partitions (as they do not have a key!) and let the consuming application determine the "real watermark" based on the mix of watermarks coming in from the upstream threads. All suggestions and ideas are appreciated. -- Best regards / Met vriendelijke groeten, Niels Basjes -- Best regards / Met vriendelijke groeten, Niels Basjes
Re: [jira] [Created] (FLINK-25197) Using Statefun RequestReplyFunctionBuilder fails with Java 8 date/time type `java.time.Duration` not supported by default: add Module "org.apache.flink.shaded.jackso
Thanks for getting back to me and Happy Holidays! I'll open a PR soon, it's a simple fix. On Mon, Dec 20, 2021 at 9:42 AM Seth Wiesman wrote: > Hi Galen, > > Sorry for the late reply, a lot of people are on sporadic schedules with > the holidays. A PR would be very welcome! I've gone ahead and assigned you > the Jira. And I'll watch the repo to review your pr. > > cheers, > > Seth > > On Mon, Dec 20, 2021 at 8:24 AM Galen Warren > wrote: > > > I just wanted to check in on this, is there any interest in a PR to fix > > this issue? Looking at the master branch, it still looks to be a problem. > > Thanks. > > > > On Mon, Dec 6, 2021 at 9:52 AM Galen Warren (Jira) > > wrote: > > > > > Galen Warren created FLINK-25197: > > > > > > > > > Summary: Using Statefun RequestReplyFunctionBuilder fails > > > with Java 8 date/time type `java.time.Duration` not supported by > default: > > > add Module > > > > > > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > > > to enable handling > > > Key: FLINK-25197 > > > URL: > https://issues.apache.org/jira/browse/FLINK-25197 > > > Project: Flink > > > Issue Type: Bug > > > Components: Stateful Functions > > > Affects Versions: statefun-3.1.0 > > > Reporter: Galen Warren > > > Fix For: statefun-3.1.0 > > > > > > > > > When using RequestReplyFunctionBuilder to build a stateful functions > job, > > > the job fails at runtime with: > > > > > > Java 8 date/time type `java.time.Duration` not supported by default: > add > > > Module > > > > > > "org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype:jackson-datatype-jsr310" > > > to enable handling > > > > > > It appears this is because, in > > > [RequestReplyFunctionBuilder::transportClientPropertiesAsObjectNode| > > > > > > https://github.com/apache/flink-statefun/blob/b4ba9547b8f0105a28544fd28a5e0433666e9023/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java#L127 > > ], > > > a default instance of ObjectMapper is used to serialize the client > > > properties, which now include a java.time.Duration. There is a > > > [StateFunObjectMapper| > > > > > > https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java > > ] > > > class in the project that has customized serde support, but it is not > > used > > > here. > > > > > > The fix seems to be to: > > > * Use an instance of StateFunObjectMapper to serialize the client > > > properties in RequestReplyFunctionBuilder > > > * Modify StateFunObjecdtMapper to both serialize and deserialize > > > instances of java.time.Duration (currently, only deserialization is > > > supported) > > > > > > I've made these changes locally and it seems to fix the problem. Would > > you > > > be interested in a PR? Thanks. > > > > > > > > > > > > > > > > > > -- > > > This message was sent by Atlassian Jira > > > (v8.20.1#820001) > > > > > >
[jira] [Created] (FLINK-25394) [Flink-ML] Upgrade log4j to 2.17.0 to address CVE-2021-45105
Abdelrahman created FLINK-25394: --- Summary: [Flink-ML] Upgrade log4j to 2.17.0 to address CVE-2021-45105 Key: FLINK-25394 URL: https://issues.apache.org/jira/browse/FLINK-25394 Project: Flink Issue Type: Improvement Affects Versions: 1.14.2 Reporter: Abdelrahman Apache Log4j2 versions 2.0-alpha1 through 2.16.0 did not protect from uncontrolled recursion from self-referential lookups. When the logging configuration uses a non-default Pattern Layout with a Context Lookup (for example, $${ctx:loginId}), attackers with control over Thread Context Map (MDC) input data can craft malicious input data that contains a recursive lookup, resulting in a StackOverflowError that will terminate the process. This is also known as a DOS (Denial of Service) attack. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25395) Incremental shared state might be discarded by TM
Roman Khachatryan created FLINK-25395: - Summary: Incremental shared state might be discarded by TM Key: FLINK-25395 URL: https://issues.apache.org/jira/browse/FLINK-25395 Project: Flink Issue Type: Bug Affects Versions: 1.15.0 Reporter: Roman Khachatryan Fix For: 1.15.0 Extracting from [FLINK-25185 discussion|https://issues.apache.org/jira/browse/FLINK-25185?focusedCommentId=17462639=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17462639] On checkpoint abortion or any failure in AsyncCheckpointRunnable, it discards the state, in particular shared (incremental) state. Since FLINK-24611, this creates a problem because shared state can be re-used for future checkpoints. Needs confirmation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [VOTE] Stateful functions 3.1.1 release
+1 (non-binding) - Verified signatures - Checked diff - Checked site PR - Build from source and ran e2e tests Seth On Mon, Dec 20, 2021 at 10:59 AM Igal Shilman wrote: > Hi everyone, > > Please review and vote on the release candidate #2 for the version 3.1.1 of > Apache Flink Stateful Functions, as follows: > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > This release updates the Flink version to fix the log4j CVEs > > **Testing Guideline** > > You can find here [1] a page in the project wiki on instructions for > testing. > To cast a vote, it is not necessary to perform all listed checks, > but please mention which checks you have performed when voting. > > **Release Overview** > > As an overview, the release consists of the following: > a) Stateful Functions canonical source distribution, to be deployed to the > release repository at dist.apache.org > b) Stateful Functions Python SDK distributions to be deployed to PyPI > c) Maven artifacts to be deployed to the Maven Central Repository > d) New Dockerfiles for the release > e) GoLang SDK tag v3.1.1-rc2 > > **Staging Areas to Review** > > The staging areas containing the above mentioned artifacts are as follows, > for your review: > * All artifacts for a) and b) can be found in the corresponding dev > repository at dist.apache.org [2] > * All artifacts for c) can be found at the Apache Nexus Repository [3] > > All artifacts are signed with the key > 73BC0A2B04ABC80BF0513382B0ED0E338D622A92 [4] > > Other links for your review: > * JIRA release notes [5] > * source code tag "release-3.0.0-rc1" [6] > * PR for the new Dockerfiles [7] > * PR for the flink website [8] > > **Vote Duration** > > The voting time will run for 24 hours. We are targeting this vote to last > until December. 21nd, 22:00 CET. > Or It is adopted by majority approval, with at least 3 PMC affirmative > votes. > > Thanks, > Seth & Igal > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+Stateful+Functions+Release > [2] https://dist.apache.org/repos/dist/dev/flink/flink-statefun-3.1.1-rc2/ > [3] https://repository.apache.org/content/repositories/orgapacheflink-1466 > [4] https://dist.apache.org/repos/dist/release/flink/KEYS > [5] > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12351096==12315522 > [6] https://github.com/apache/flink-statefun/tree/release-3.1.1-rc2 > [7] https://github.com/apache/flink-statefun-docker/pull/18 > [8] https://github.com/apache/flink-web/pull/492 >