退订
退订 | | lizikunn | | lizik...@163.com | 签名由网易邮箱大师定制
[jira] [Created] (FLINK-22883) Select view columns fail when store metadata with hive
ELLEX_SHEN created FLINK-22883: -- Summary: Select view columns fail when store metadata with hive Key: FLINK-22883 URL: https://issues.apache.org/jira/browse/FLINK-22883 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.13.1 Reporter: ELLEX_SHEN 1、Flink的元数据使用Hive进行保存,新建视图并保存后,Select时会报错找不到列; 2、在V1.12.3版本上没有问题,但是,升级为V1.13.0 / V1.13.1 后,View的还是会报错无法查询到列的问题。 3、实例代码,以下最后一行Inset报错: -- --- 0、新建数据库 --- -- 初始化 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'test' -- 'hive-conf-dir' = '/usr/local/share/flink-1.13.0/conf' -- 默认读取本地hive-site.xml ); USE CATALOG myhive; SET table.sql-dialect=default; -- --- 1、KAFKA 源系统数据 --- DROP TABLE IF EXISTS e_click; CREATE TABLE e_click ( user_id STRING, ts STRING, log_ts TIMESTAMP(3), proctime as PROCTIME(), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column )with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.properties.bootstrap.servers' = 'master.northking.com:9092,slave1.northking.com:9092,slave2.northking.com:9092', 'connector.topic' = 'user_clicks', 'connector.properties.group.id' = 'click', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); DROP VIEW IF EXISTS e_click_v; create view e_click_v as select * from e_click; -- SET table.sql-dialect=hive; drop table IF EXISTS e_click_hive; CREATE TABLE e_click_hive ( user_id STRING, ts STRING ) PARTITIONED BY ( dt STRING ) STORED AS parquet TBLPROPERTIES ( 'sink.partition-commit.trigger'='partition-time', 'partition.time-extractor.timestamp-pattern'='$dt', 'sink.partition-commit.policy.kind'='metastore,success-file' ); -- view -> Hive SET table.sql-dialect=default; insert into e_click_hive select user_id,ts,DATE_FORMAT(log_ts, '-MM-dd') from e_click_v; 4、报错信息: Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 12 to line 2, column 18: Column 'user_id' not found in any tableCaused by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 12 to line 2, column 18: Column 'user_id' not found in any table at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861) at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259) at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5833) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:5982) at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:5967) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5416) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:398) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347) at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975) at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952) at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Watermark propagation with Sink API
I understand your scenario but I disagree with its assumptions: "However, the partition of A is empty and thus A is temporarily idle." - you're assuming that the behavior of the source is to mark itself idle if data isn't available, but that's clearly source-specific and not behavior we expect to have in Pulsar source. A partition may be empty indefinitely while still being active. Imagine that the producer is defending a lease - "I'm here, there's no data, please don't advance the clock". "we bind idleness to wall clock time" - you're characterizing a specific strategy (WatermarkStrategy.withIdleness()), not the inherent behavior of the pipeline. I wouldn't recommend using withIdleness() with source-based watermarks. I do agree that dynamism in partition assignment can wreak havoc on watermark correctness. We have some ideas on the Pulsar side about that too. I would ask that we focus on the Flink framework and pipeline behavior. By offering a more powerful framework, we encourage stream storage systems to "rise to the occasion" - treat event time in a first-class way, optimize for correctness, etc. In this case, FLIP-167 is setting the stage for evolution in Pulsar. Thanks again Arvid for the great discussion. On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise wrote: > At least one big motivation is having (temporary) empty partitions. Let me > give you an example, why imho idleness is only approximate in this case: > Assume you have source subtask A, B, C that correspond to 3 source > partitions and a downstream keyed window operator W. > > W would usually trigger on min_watermark(A, B, C). However, the partition > of A is empty and thus A is temporarily idle. So W triggers on > min_watermark(B, C). When A is now active again, the watermark implicitly > is min_watermark(B, C) for A! > > Let's further assume that the source is filled by another pipeline before. > This pipeline experiences technical difficulties for X minutes and could > not produce into the partition of A, hence the idleness. When the upstream > pipeline resumes it fills A with some records that are before > min_watermark(B, C). Any watermark generated from these records is > discarded as the watermark is monotonous. Therefore, these records will be > considered late by W and discarded. > > Without idleness, we would have simply bocked W until the upstream pipeline > fully recovers and we would not have had any late records. The same holds > for any reprocessing where the data of partition A is continuous. > > If you look deeper, the issue is that we bind idleness to wall clock time > (e.g. advance watermark after X seconds without data). Then we assume the > watermark of the idle partition to be in sync with the slowest partition. > However, in the case of hiccups, this assumption does not hold at all. > I don't see any fix for that (easy or not easy) and imho it's inherent to > the design of idleness. > We lack information (why is no data coming) and have a heuristic to fix it. > > In the case of partition assignment where one subtask has no partition, we > are probably somewhat safe. We know why no data is coming (no partition) > and as long as we do not have dynamic partition assignment, there will > never be a switch to active without restart (for the foreseeable future). > > On Fri, Jun 4, 2021 at 10:34 PM Eron Wright .invalid> > wrote: > > > Yes I'm talking about an implementation of idleness that is unrelated to > > processing time. The clear example is partition assignment to subtasks, > > which probably motivated Flink's idleness functionality in the first > place. > > > > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise wrote: > > > > > Hi Eron, > > > > > > Are you referring to an implementation of idleness that does not rely > on > > a > > > wall clock but on some clock baked into the partition information of > the > > > source system? > > > If so, you are right that it invalidates my points. > > > Do you have an example on where this is used? > > > > > > With a wall clock, you always run into the issues that I describe since > > you > > > are effectively mixing event time and processing time... > > > > > > > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright > > .invalid> > > > wrote: > > > > > > > Dawid, I think you're mischaracterizing the idleness signal as > > > inherently a > > > > heuristic, but Flink does not impose that. A source-based watermark > > (and > > > > corresponding idleness signal) may well be entirely data-driven, > > entirely > > > > deterministic. Basically you're underselling what the pipeline is > > > capable > > > > of, based on painful experiences with using the generic, > > heuristics-based > > > > watermark assigner. Please don't let those experiences overshadow > > what's > > > > possible with source-based watermarking. > > > > > > > > The idleness signal does have a strict definition, it indicates > whether > > > the > > > > stream is actively participating in advancing the event time
Re: [DISCUSS] Watermark propagation with Sink API
At least one big motivation is having (temporary) empty partitions. Let me give you an example, why imho idleness is only approximate in this case: Assume you have source subtask A, B, C that correspond to 3 source partitions and a downstream keyed window operator W. W would usually trigger on min_watermark(A, B, C). However, the partition of A is empty and thus A is temporarily idle. So W triggers on min_watermark(B, C). When A is now active again, the watermark implicitly is min_watermark(B, C) for A! Let's further assume that the source is filled by another pipeline before. This pipeline experiences technical difficulties for X minutes and could not produce into the partition of A, hence the idleness. When the upstream pipeline resumes it fills A with some records that are before min_watermark(B, C). Any watermark generated from these records is discarded as the watermark is monotonous. Therefore, these records will be considered late by W and discarded. Without idleness, we would have simply bocked W until the upstream pipeline fully recovers and we would not have had any late records. The same holds for any reprocessing where the data of partition A is continuous. If you look deeper, the issue is that we bind idleness to wall clock time (e.g. advance watermark after X seconds without data). Then we assume the watermark of the idle partition to be in sync with the slowest partition. However, in the case of hiccups, this assumption does not hold at all. I don't see any fix for that (easy or not easy) and imho it's inherent to the design of idleness. We lack information (why is no data coming) and have a heuristic to fix it. In the case of partition assignment where one subtask has no partition, we are probably somewhat safe. We know why no data is coming (no partition) and as long as we do not have dynamic partition assignment, there will never be a switch to active without restart (for the foreseeable future). On Fri, Jun 4, 2021 at 10:34 PM Eron Wright wrote: > Yes I'm talking about an implementation of idleness that is unrelated to > processing time. The clear example is partition assignment to subtasks, > which probably motivated Flink's idleness functionality in the first place. > > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise wrote: > > > Hi Eron, > > > > Are you referring to an implementation of idleness that does not rely on > a > > wall clock but on some clock baked into the partition information of the > > source system? > > If so, you are right that it invalidates my points. > > Do you have an example on where this is used? > > > > With a wall clock, you always run into the issues that I describe since > you > > are effectively mixing event time and processing time... > > > > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright > .invalid> > > wrote: > > > > > Dawid, I think you're mischaracterizing the idleness signal as > > inherently a > > > heuristic, but Flink does not impose that. A source-based watermark > (and > > > corresponding idleness signal) may well be entirely data-driven, > entirely > > > deterministic. Basically you're underselling what the pipeline is > > capable > > > of, based on painful experiences with using the generic, > heuristics-based > > > watermark assigner. Please don't let those experiences overshadow > what's > > > possible with source-based watermarking. > > > > > > The idleness signal does have a strict definition, it indicates whether > > the > > > stream is actively participating in advancing the event time clock. > The > > > status of all participants is considered when aggregating watermarks. > A > > > source subtask generally makes the determination based on data, e.g. > > > whether a topic is assigned to that subtask. > > > > > > We have here a modest proposal to add callbacks to the sink function > for > > > information that the sink operator already receives. The practical > > result > > > is improved correctness when used with streaming systems that have > > > first-class support for event time. The specific changes may be > > previewed > > > here: > > > https://github.com/apache/flink/pull/15950 > > > https://github.com/streamnative/flink/pull/2 > > > > > > Thank you all for the robust discussion. Do I have your support to > > proceed > > > to enhance FLIP-167 with idleness callbacks and to proceed to a vote? > > > > > > Eron > > > > > > > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise wrote: > > > > > > > While everything I wrote before is still valid, upon further > > rethinking, > > > I > > > > think that the conclusion is not necessarily correct: > > > > - If the user wants to have pipeline A and B behaving as if A+B was > > > jointly > > > > executed in the same pipeline without the intermediate Pulsar topic, > > > having > > > > the idleness in that topic is to only way to guarantee consistency. > > > > - We could support the following in the respective sources: If the > user > > > > that wants to use a different definition of
Re: [DISCUSS] Watermark propagation with Sink API
Yes I'm talking about an implementation of idleness that is unrelated to processing time. The clear example is partition assignment to subtasks, which probably motivated Flink's idleness functionality in the first place. On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise wrote: > Hi Eron, > > Are you referring to an implementation of idleness that does not rely on a > wall clock but on some clock baked into the partition information of the > source system? > If so, you are right that it invalidates my points. > Do you have an example on where this is used? > > With a wall clock, you always run into the issues that I describe since you > are effectively mixing event time and processing time... > > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright .invalid> > wrote: > > > Dawid, I think you're mischaracterizing the idleness signal as > inherently a > > heuristic, but Flink does not impose that. A source-based watermark (and > > corresponding idleness signal) may well be entirely data-driven, entirely > > deterministic. Basically you're underselling what the pipeline is > capable > > of, based on painful experiences with using the generic, heuristics-based > > watermark assigner. Please don't let those experiences overshadow what's > > possible with source-based watermarking. > > > > The idleness signal does have a strict definition, it indicates whether > the > > stream is actively participating in advancing the event time clock. The > > status of all participants is considered when aggregating watermarks. A > > source subtask generally makes the determination based on data, e.g. > > whether a topic is assigned to that subtask. > > > > We have here a modest proposal to add callbacks to the sink function for > > information that the sink operator already receives. The practical > result > > is improved correctness when used with streaming systems that have > > first-class support for event time. The specific changes may be > previewed > > here: > > https://github.com/apache/flink/pull/15950 > > https://github.com/streamnative/flink/pull/2 > > > > Thank you all for the robust discussion. Do I have your support to > proceed > > to enhance FLIP-167 with idleness callbacks and to proceed to a vote? > > > > Eron > > > > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise wrote: > > > > > While everything I wrote before is still valid, upon further > rethinking, > > I > > > think that the conclusion is not necessarily correct: > > > - If the user wants to have pipeline A and B behaving as if A+B was > > jointly > > > executed in the same pipeline without the intermediate Pulsar topic, > > having > > > the idleness in that topic is to only way to guarantee consistency. > > > - We could support the following in the respective sources: If the user > > > that wants to use a different definition of idleness in B, they can > just > > > provide a new idleness definition. At that point, we should discard the > > > idleness in the intermediate topic while reading. > > > > > > If we would agree on the latter way, I think having the idleness in the > > > topic is of great use because it's a piece of information that cannot > be > > > inferred as stated by others. Consequently, we would be able to support > > all > > > use cases and can give the user the freedom to express his intent. > > > > > > > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise wrote: > > > > > > > I think the core issue in this discussion is that we kind of assume > > that > > > > idleness is something universally well-defined. But it's not. It's a > > > > heuristic to advance data processing in event time where we would > lack > > > data > > > > to do so otherwise. > > > > Keep in mind that idleness has no real definition in terms of event > > time > > > > and leads to severe unexpected results: If you reprocess a data > stream > > > with > > > > temporarily idle partitions, these partitions would not be deemed > idle > > on > > > > reprocessing and there is a realistic chance that records that were > > > deemed > > > > late in the live processing case are now perfectly fine records in > the > > > > reprocessing case. (I can expand on that if that was too short) > > > > > > > > With that in mind, why would a downstream process even try to > calculate > > > > the same idleness state as the upstream process? I don't see a point; > > we > > > > would just further any imprecision in the calculation. > > > > > > > > Let's have a concrete example. Assume that we have upstream pipeline > A > > > and > > > > downstream pipeline B. A has plenty of resources and is live > processing > > > > data. Some partitions are idle and that is propagated to the sinks. > > Now B > > > > is heavily backpressured and consumes very slowly. B doesn't see any > > > > idleness directly. B can calculate exact watermarks and use all > records > > > for > > > > it's calculation. Reprocessing would yield the same result for B. If > we > > > now > > > > forward idleness, we can easily find
Re: [DISCUSS] Watermark propagation with Sink API
Hi Eron, Are you referring to an implementation of idleness that does not rely on a wall clock but on some clock baked into the partition information of the source system? If so, you are right that it invalidates my points. Do you have an example on where this is used? With a wall clock, you always run into the issues that I describe since you are effectively mixing event time and processing time... On Fri, Jun 4, 2021 at 6:28 PM Eron Wright wrote: > Dawid, I think you're mischaracterizing the idleness signal as inherently a > heuristic, but Flink does not impose that. A source-based watermark (and > corresponding idleness signal) may well be entirely data-driven, entirely > deterministic. Basically you're underselling what the pipeline is capable > of, based on painful experiences with using the generic, heuristics-based > watermark assigner. Please don't let those experiences overshadow what's > possible with source-based watermarking. > > The idleness signal does have a strict definition, it indicates whether the > stream is actively participating in advancing the event time clock. The > status of all participants is considered when aggregating watermarks. A > source subtask generally makes the determination based on data, e.g. > whether a topic is assigned to that subtask. > > We have here a modest proposal to add callbacks to the sink function for > information that the sink operator already receives. The practical result > is improved correctness when used with streaming systems that have > first-class support for event time. The specific changes may be previewed > here: > https://github.com/apache/flink/pull/15950 > https://github.com/streamnative/flink/pull/2 > > Thank you all for the robust discussion. Do I have your support to proceed > to enhance FLIP-167 with idleness callbacks and to proceed to a vote? > > Eron > > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise wrote: > > > While everything I wrote before is still valid, upon further rethinking, > I > > think that the conclusion is not necessarily correct: > > - If the user wants to have pipeline A and B behaving as if A+B was > jointly > > executed in the same pipeline without the intermediate Pulsar topic, > having > > the idleness in that topic is to only way to guarantee consistency. > > - We could support the following in the respective sources: If the user > > that wants to use a different definition of idleness in B, they can just > > provide a new idleness definition. At that point, we should discard the > > idleness in the intermediate topic while reading. > > > > If we would agree on the latter way, I think having the idleness in the > > topic is of great use because it's a piece of information that cannot be > > inferred as stated by others. Consequently, we would be able to support > all > > use cases and can give the user the freedom to express his intent. > > > > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise wrote: > > > > > I think the core issue in this discussion is that we kind of assume > that > > > idleness is something universally well-defined. But it's not. It's a > > > heuristic to advance data processing in event time where we would lack > > data > > > to do so otherwise. > > > Keep in mind that idleness has no real definition in terms of event > time > > > and leads to severe unexpected results: If you reprocess a data stream > > with > > > temporarily idle partitions, these partitions would not be deemed idle > on > > > reprocessing and there is a realistic chance that records that were > > deemed > > > late in the live processing case are now perfectly fine records in the > > > reprocessing case. (I can expand on that if that was too short) > > > > > > With that in mind, why would a downstream process even try to calculate > > > the same idleness state as the upstream process? I don't see a point; > we > > > would just further any imprecision in the calculation. > > > > > > Let's have a concrete example. Assume that we have upstream pipeline A > > and > > > downstream pipeline B. A has plenty of resources and is live processing > > > data. Some partitions are idle and that is propagated to the sinks. > Now B > > > is heavily backpressured and consumes very slowly. B doesn't see any > > > idleness directly. B can calculate exact watermarks and use all records > > for > > > it's calculation. Reprocessing would yield the same result for B. If we > > now > > > forward idleness, we can easily find cases where we would advance the > > > watermark prematurely while there is data directly available to > calculate > > > the exact watermark. > > > > > > For me, idleness is just a pipeline-specific heuristic and should be > > > viewed as such. > > > > > > Best, > > > > > > Arvid > > > > > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski > > > wrote: > > > > > >> Hi, > > >> > > >> > Imagine you're starting consuming from the result channel in a > > situation > > >> were you have: > > >> > record4, record3,
Re: [VOTE] Watermark propagation with Sink API
Little update on this, more good discussion over the last few days, and the FLIP will probably be amended to incorporate idleness. Voting will remain open until, let's say, mid-next week. On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski wrote: > I would like to ask you to hold on with counting the votes until I get an > answer for my one question in the dev mailing list thread (sorry if it was > already covered somewhere). > > Best, Piotrek > > czw., 3 cze 2021 o 16:12 Jark Wu napisał(a): > > > +1 (binding) > > > > Best, > > Jark > > > > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz > > wrote: > > > > > +1 (binding) > > > > > > Best, > > > > > > Dawid > > > > > > On 03/06/2021 03:50, Zhou, Brian wrote: > > > > +1 (non-binding) > > > > > > > > Thanks Eron, looking forward to seeing this feature soon. > > > > > > > > Thanks, > > > > Brian > > > > > > > > -Original Message- > > > > From: Arvid Heise > > > > Sent: Wednesday, June 2, 2021 15:44 > > > > To: dev > > > > Subject: Re: [VOTE] Watermark propagation with Sink API > > > > > > > > > > > > [EXTERNAL EMAIL] > > > > > > > > +1 (binding) > > > > > > > > Thanks Eron for driving this effort; it will open up new exciting use > > > cases. > > > > > > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright > > .invalid> > > > > wrote: > > > > > > > >> After some good discussion about how to enhance the Sink API to > > > >> process watermarks, I believe we're ready to proceed with a vote. > > > >> Voting will be open until at least Friday, June 4th, 2021. > > > >> > > > >> Reference: > > > >> > > > >> > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa > > > >> > y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi > > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$ > > > >> [cwiki[.]apache[.]org] > > > >> > > > >> Discussion thread: > > > >> > > > >> > > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194 > > > >> > e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap > > > >> ache.org > > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF > > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org] > > > >> > > > >> Implementation Issue: > > > >> > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN > > > >> > K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G > > > >> N6AJm4h$ [issues[.]apache[.]org] > > > >> > > > >> Thanks, > > > >> Eron Wright > > > >> StreamNative > > > >> > > > > > > > > >
Re: Add control mode for flink
I agree with Steven. This logic can be added in a dynamic config framework that can bind into Flink operators. We probably don't need to let Flink runtime handle it. On Fri, Jun 4, 2021 at 8:11 AM Steven Wu wrote: > I am not sure if we should solve this problem in Flink. This is more like > a dynamic config problem that probably should be solved by some > configuration framework. Here is one post from google search: > https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a > > On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 wrote: > >> Hi everyone, >> >> Flink jobs are always long-running. When the job is running, users >> may want to control the job but not stop it. The control reasons can be >> different as following: >> >>1. >> >>Change data processing’ logic, such as filter condition. >>2. >> >>Send trigger events to make the progress forward. >>3. >> >>Define some tools to degrade the job, such as limit input qps, >>sampling data. >>4. >> >>Change log level to debug current problem. >> >> The common way to do this is to stop the job, do modifications and >> start the job. It may take a long time to recover. In some situations, >> stopping jobs is intolerable, for example, the job is related to money or >> important activities.So we need some technologies to control the running >> job without stopping the job. >> >> >> We propose to add control mode for flink. A control mode based on the >> restful interface is first introduced. It works by these steps: >> >> >>1. The user can predefine some logic which supports config control, >>such as filter condition. >>2. Run the job. >>3. If the user wants to change the job's running logic, just send a >>restful request with the responding config. >> >> Other control modes will also be considered in the future. More >> introduction can refer to the doc >> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing >> . If the community likes the proposal, more discussion is needed and a more >> detailed design will be given later. Any suggestions and ideas are welcome. >> >>
Re: [DISCUSS] Watermark propagation with Sink API
Dawid, I think you're mischaracterizing the idleness signal as inherently a heuristic, but Flink does not impose that. A source-based watermark (and corresponding idleness signal) may well be entirely data-driven, entirely deterministic. Basically you're underselling what the pipeline is capable of, based on painful experiences with using the generic, heuristics-based watermark assigner. Please don't let those experiences overshadow what's possible with source-based watermarking. The idleness signal does have a strict definition, it indicates whether the stream is actively participating in advancing the event time clock. The status of all participants is considered when aggregating watermarks. A source subtask generally makes the determination based on data, e.g. whether a topic is assigned to that subtask. We have here a modest proposal to add callbacks to the sink function for information that the sink operator already receives. The practical result is improved correctness when used with streaming systems that have first-class support for event time. The specific changes may be previewed here: https://github.com/apache/flink/pull/15950 https://github.com/streamnative/flink/pull/2 Thank you all for the robust discussion. Do I have your support to proceed to enhance FLIP-167 with idleness callbacks and to proceed to a vote? Eron On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise wrote: > While everything I wrote before is still valid, upon further rethinking, I > think that the conclusion is not necessarily correct: > - If the user wants to have pipeline A and B behaving as if A+B was jointly > executed in the same pipeline without the intermediate Pulsar topic, having > the idleness in that topic is to only way to guarantee consistency. > - We could support the following in the respective sources: If the user > that wants to use a different definition of idleness in B, they can just > provide a new idleness definition. At that point, we should discard the > idleness in the intermediate topic while reading. > > If we would agree on the latter way, I think having the idleness in the > topic is of great use because it's a piece of information that cannot be > inferred as stated by others. Consequently, we would be able to support all > use cases and can give the user the freedom to express his intent. > > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise wrote: > > > I think the core issue in this discussion is that we kind of assume that > > idleness is something universally well-defined. But it's not. It's a > > heuristic to advance data processing in event time where we would lack > data > > to do so otherwise. > > Keep in mind that idleness has no real definition in terms of event time > > and leads to severe unexpected results: If you reprocess a data stream > with > > temporarily idle partitions, these partitions would not be deemed idle on > > reprocessing and there is a realistic chance that records that were > deemed > > late in the live processing case are now perfectly fine records in the > > reprocessing case. (I can expand on that if that was too short) > > > > With that in mind, why would a downstream process even try to calculate > > the same idleness state as the upstream process? I don't see a point; we > > would just further any imprecision in the calculation. > > > > Let's have a concrete example. Assume that we have upstream pipeline A > and > > downstream pipeline B. A has plenty of resources and is live processing > > data. Some partitions are idle and that is propagated to the sinks. Now B > > is heavily backpressured and consumes very slowly. B doesn't see any > > idleness directly. B can calculate exact watermarks and use all records > for > > it's calculation. Reprocessing would yield the same result for B. If we > now > > forward idleness, we can easily find cases where we would advance the > > watermark prematurely while there is data directly available to calculate > > the exact watermark. > > > > For me, idleness is just a pipeline-specific heuristic and should be > > viewed as such. > > > > Best, > > > > Arvid > > > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski > > wrote: > > > >> Hi, > >> > >> > Imagine you're starting consuming from the result channel in a > situation > >> were you have: > >> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, > >> record1, record0 > >> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might > >> cause the record3 and record4 to be late depending on how the watermark > >> progressed in other partitions. > >> > >> Yes, I understand this point. But it can also be the other way around. > >> There might be a large gap between record2 and record3, and users might > >> prefer or might be not able to duplicate idleness detection logic. The > >> downstream system might be lacking some kind of information (that is > only > >> available in the top level/ingesting system) to correctly set the idle > >> status. > >>
[jira] [Created] (FLINK-22882) Tasks are blocked while emitting records
Piotr Nowojski created FLINK-22882: -- Summary: Tasks are blocked while emitting records Key: FLINK-22882 URL: https://issues.apache.org/jira/browse/FLINK-22882 Project: Flink Issue Type: Bug Components: Runtime / Network, Runtime / Task Affects Versions: 1.14.0 Reporter: Piotr Nowojski On a cluster I observed symptoms of tasks being blocked for long time, causing long delays with unaligned checkpointing. 99% of those cases were caused by `broadcastEmit` of the stream status {noformat} 2021-06-04 14:41:44,049 ERROR org.apache.flink.runtime.io.network.buffer.LocalBufferPool [] - Blocking wait [11059 ms] for an available buffer. java.lang.Exception: Stracktracegenerator at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:246) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:67) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.writeStreamStatus(RecordWriterOutput.java:136) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus.ensureActive(AnnouncedStatus.java:65) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at
[jira] [Created] (FLINK-22881) Tasks are blocked while emitting stream status
Piotr Nowojski created FLINK-22881: -- Summary: Tasks are blocked while emitting stream status Key: FLINK-22881 URL: https://issues.apache.org/jira/browse/FLINK-22881 Project: Flink Issue Type: Bug Components: Runtime / Network, Runtime / Task Affects Versions: 1.14.0 Reporter: Piotr Nowojski On a cluster I observed symptoms of tasks being blocked for long time, causing long delays with unaligned checkpointing. 99% of those cases were caused by `broadcastEmit` of the stream status ``` 2021-06-04 14:41:44,049 ERROR org.apache.flink.runtime.io.network.buffer.LocalBufferPool [] - Blocking wait [11059 ms] for an available buffer. java.lang.Exception: Stracktracegenerator at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:246) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:67) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.writeStreamStatus(RecordWriterOutput.java:136) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus.ensureActive(AnnouncedStatus.java:65) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at
Re: [DISCUSS] Watermark propagation with Sink API
While everything I wrote before is still valid, upon further rethinking, I think that the conclusion is not necessarily correct: - If the user wants to have pipeline A and B behaving as if A+B was jointly executed in the same pipeline without the intermediate Pulsar topic, having the idleness in that topic is to only way to guarantee consistency. - We could support the following in the respective sources: If the user that wants to use a different definition of idleness in B, they can just provide a new idleness definition. At that point, we should discard the idleness in the intermediate topic while reading. If we would agree on the latter way, I think having the idleness in the topic is of great use because it's a piece of information that cannot be inferred as stated by others. Consequently, we would be able to support all use cases and can give the user the freedom to express his intent. On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise wrote: > I think the core issue in this discussion is that we kind of assume that > idleness is something universally well-defined. But it's not. It's a > heuristic to advance data processing in event time where we would lack data > to do so otherwise. > Keep in mind that idleness has no real definition in terms of event time > and leads to severe unexpected results: If you reprocess a data stream with > temporarily idle partitions, these partitions would not be deemed idle on > reprocessing and there is a realistic chance that records that were deemed > late in the live processing case are now perfectly fine records in the > reprocessing case. (I can expand on that if that was too short) > > With that in mind, why would a downstream process even try to calculate > the same idleness state as the upstream process? I don't see a point; we > would just further any imprecision in the calculation. > > Let's have a concrete example. Assume that we have upstream pipeline A and > downstream pipeline B. A has plenty of resources and is live processing > data. Some partitions are idle and that is propagated to the sinks. Now B > is heavily backpressured and consumes very slowly. B doesn't see any > idleness directly. B can calculate exact watermarks and use all records for > it's calculation. Reprocessing would yield the same result for B. If we now > forward idleness, we can easily find cases where we would advance the > watermark prematurely while there is data directly available to calculate > the exact watermark. > > For me, idleness is just a pipeline-specific heuristic and should be > viewed as such. > > Best, > > Arvid > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski > wrote: > >> Hi, >> >> > Imagine you're starting consuming from the result channel in a situation >> were you have: >> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, >> record1, record0 >> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might >> cause the record3 and record4 to be late depending on how the watermark >> progressed in other partitions. >> >> Yes, I understand this point. But it can also be the other way around. >> There might be a large gap between record2 and record3, and users might >> prefer or might be not able to duplicate idleness detection logic. The >> downstream system might be lacking some kind of information (that is only >> available in the top level/ingesting system) to correctly set the idle >> status. >> >> Piotrek >> >> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz >> napisał(a): >> >> > >> > Same as Eron I don't follow this point. Any streaming sink can be used >> as >> > this kind of transient channel. Streaming sinks, like Kafka, are also >> used >> > to connect one streaming system with another one, also for an immediate >> > consumption. >> > >> > Sure it can, but imo it is rarely the primary use case why you want to >> > offload the channels to an external persistent system. Again in my >> > understanding StreamStatus is something transient, e.g. part of our >> > external system went offline. I think those kind of events should not be >> > persisted. >> > >> > Both watermarks and idleness status can be some >> > inherent property of the underlying data stream. if an >> upstream/ingesting >> > system knows that this particular stream/partition of a stream is going >> > idle (for example for a couple of hours), why does this information >> have to >> > be re-created in the downstream system using some heuristic? It could be >> > explicitly encoded. >> > >> > Because it's most certainly not true in the downstream. The idleness >> works >> > usually according to a heuristic: "We have not seen records for 5 >> minutes, >> > so there is a fair chance we won't see records for the next 5 minutes, >> so >> > let's not wait for watermarks for now." That heuristic most certainly >> won't >> > hold for a downstream persistent storage. >> > >> > Imagine you're starting consuming from the result channel in a situation >> > were you have: >> > >> >
Re: Add control mode for flink
I am not sure if we should solve this problem in Flink. This is more like a dynamic config problem that probably should be solved by some configuration framework. Here is one post from google search: https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a On Fri, Jun 4, 2021 at 7:09 AM 刘建刚 wrote: > Hi everyone, > > Flink jobs are always long-running. When the job is running, users > may want to control the job but not stop it. The control reasons can be > different as following: > >1. > >Change data processing’ logic, such as filter condition. >2. > >Send trigger events to make the progress forward. >3. > >Define some tools to degrade the job, such as limit input qps, >sampling data. >4. > >Change log level to debug current problem. > > The common way to do this is to stop the job, do modifications and > start the job. It may take a long time to recover. In some situations, > stopping jobs is intolerable, for example, the job is related to money or > important activities.So we need some technologies to control the running > job without stopping the job. > > > We propose to add control mode for flink. A control mode based on the > restful interface is first introduced. It works by these steps: > > >1. The user can predefine some logic which supports config control, >such as filter condition. >2. Run the job. >3. If the user wants to change the job's running logic, just send a >restful request with the responding config. > > Other control modes will also be considered in the future. More > introduction can refer to the doc > https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing > . If the community likes the proposal, more discussion is needed and a more > detailed design will be given later. Any suggestions and ideas are welcome. > >
Add control mode for flink
Hi everyone, Flink jobs are always long-running. When the job is running, users may want to control the job but not stop it. The control reasons can be different as following: 1. Change data processing’ logic, such as filter condition. 2. Send trigger events to make the progress forward. 3. Define some tools to degrade the job, such as limit input qps, sampling data. 4. Change log level to debug current problem. The common way to do this is to stop the job, do modifications and start the job. It may take a long time to recover. In some situations, stopping jobs is intolerable, for example, the job is related to money or important activities.So we need some technologies to control the running job without stopping the job. We propose to add control mode for flink. A control mode based on the restful interface is first introduced. It works by these steps: 1. The user can predefine some logic which supports config control, such as filter condition. 2. Run the job. 3. If the user wants to change the job's running logic, just send a restful request with the responding config. Other control modes will also be considered in the future. More introduction can refer to the doc https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing . If the community likes the proposal, more discussion is needed and a more detailed design will be given later. Any suggestions and ideas are welcome.
Re: [DISCUSS] Watermark propagation with Sink API
I think the core issue in this discussion is that we kind of assume that idleness is something universally well-defined. But it's not. It's a heuristic to advance data processing in event time where we would lack data to do so otherwise. Keep in mind that idleness has no real definition in terms of event time and leads to severe unexpected results: If you reprocess a data stream with temporarily idle partitions, these partitions would not be deemed idle on reprocessing and there is a realistic chance that records that were deemed late in the live processing case are now perfectly fine records in the reprocessing case. (I can expand on that if that was too short) With that in mind, why would a downstream process even try to calculate the same idleness state as the upstream process? I don't see a point; we would just further any imprecision in the calculation. Let's have a concrete example. Assume that we have upstream pipeline A and downstream pipeline B. A has plenty of resources and is live processing data. Some partitions are idle and that is propagated to the sinks. Now B is heavily backpressured and consumes very slowly. B doesn't see any idleness directly. B can calculate exact watermarks and use all records for it's calculation. Reprocessing would yield the same result for B. If we now forward idleness, we can easily find cases where we would advance the watermark prematurely while there is data directly available to calculate the exact watermark. For me, idleness is just a pipeline-specific heuristic and should be viewed as such. Best, Arvid On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski wrote: > Hi, > > > Imagine you're starting consuming from the result channel in a situation > were you have: > > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, > record1, record0 > > Switching to the encoded StreamStatus.IDLE is unnecessary, and might > cause the record3 and record4 to be late depending on how the watermark > progressed in other partitions. > > Yes, I understand this point. But it can also be the other way around. > There might be a large gap between record2 and record3, and users might > prefer or might be not able to duplicate idleness detection logic. The > downstream system might be lacking some kind of information (that is only > available in the top level/ingesting system) to correctly set the idle > status. > > Piotrek > > pt., 4 cze 2021 o 12:30 Dawid Wysakowicz > napisał(a): > > > > > Same as Eron I don't follow this point. Any streaming sink can be used as > > this kind of transient channel. Streaming sinks, like Kafka, are also > used > > to connect one streaming system with another one, also for an immediate > > consumption. > > > > Sure it can, but imo it is rarely the primary use case why you want to > > offload the channels to an external persistent system. Again in my > > understanding StreamStatus is something transient, e.g. part of our > > external system went offline. I think those kind of events should not be > > persisted. > > > > Both watermarks and idleness status can be some > > inherent property of the underlying data stream. if an upstream/ingesting > > system knows that this particular stream/partition of a stream is going > > idle (for example for a couple of hours), why does this information have > to > > be re-created in the downstream system using some heuristic? It could be > > explicitly encoded. > > > > Because it's most certainly not true in the downstream. The idleness > works > > usually according to a heuristic: "We have not seen records for 5 > minutes, > > so there is a fair chance we won't see records for the next 5 minutes, so > > let's not wait for watermarks for now." That heuristic most certainly > won't > > hold for a downstream persistent storage. > > > > Imagine you're starting consuming from the result channel in a situation > > were you have: > > > > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, > record1, > > record0 > > > > Switching to the encoded StreamStatus.IDLE is unnecessary, and might > cause > > the record3 and record4 to be late depending on how the watermark > > progressed in other partitions. > > > > I understand Eron's use case, which is not about storing the > StreamStatus, > > but performing an immediate aggregation or said differently changing the > > partitioning/granularity of records and watermarks externally to Flink. > The > > produced by Flink partitioning is actually never persisted in that case. > In > > this case I agree exposing the StreamStatus makes sense. I am still > > concerned it will lead to storing the StreamStatus which can lead to many > > subtle problems. > > On 04/06/2021 11:53, Piotr Nowojski wrote: > > > > Hi, > > > > Thanks for picking up this discussion. For the record, I also think we > > shouldn't expose latency markers. > > > > About the stream status > > > > > > Persisting the StreamStatus > > > > I don't agree with the view that sinks are "storing" the
Re: [DISCUSS] Dashboard/HistoryServer authentication
As I've said I am not a security expert and that's why I have to ask for clarification, Gabor. You are saying that if we configure a truststore for the REST endpoint with a single trusted certificate which has been generated by the operator of the Flink cluster, then the attacker can generate a new certificate, sign it and then talk to the Flink cluster if he has access to the node on which the REST endpoint runs? My understanding was that you need the corresponding private key which in my proposed setup would be under the control of the operator as well (e.g. stored in a keystore on the same machine but guarded by some secret). That way (if I am not mistaken), only the entity which has access to the keystore is able to talk to the Flink cluster. Maybe we are also getting our wires crossed here and are talking about different things. Thanks for listing the pros and cons of Kerberos. Concerning what other authentication mechanisms are used in the industry, I am not 100% sure. Cheers, Till On Fri, Jun 4, 2021 at 11:09 AM Gabor Somogyi wrote: > > I did not mean for the user to sign its own certificates but for the > operator of the cluster. Once the user request hits the proxy, it should no > longer be under his control. I think I do not fully understand yet why this > would not work. > I said it's not solving the authentication problem over any proxy. Even if > the operator is signing the certificate one can have access to an internal > node. > Such case anybody can craft certificates which is accepted by the server. > When it's accepted a bad guy can cancel jobs causing huge impacts. > > > Also, I am missing a bit the comparison of Kerberos to other > authentication mechanisms and why they were rejected in favour of Kerberos. > PROS: > * Since it's not depending on cloud provider and/or k8s or bare-metal etc. > deployment it's the biggest plus > * Centralized with tools and no need to write tons of tools around > * There are clients/tools on almost all OS-es and several languages > * Super huge users are using it for years in production w/o huge issues > * Provides cross-realm trust possibility amongst other features > * Several open source components using it which could increase > compatibility > > CONS: > * Not everybody using kerberos > * It would increase the code footprint but this is true for many features > (as a side note I'm here to maintain it) > > Feel free to add your points because it only represents a single viewpoint. > Also if you have any better option for strong authentication please share > it and we can consider the pros/cons here. > > BR, > G > > > On Fri, Jun 4, 2021 at 10:32 AM Till Rohrmann > wrote: > >> I did not mean for the user to sign its own certificates but for the >> operator of the cluster. Once the user request hits the proxy, it should no >> longer be under his control. I think I do not fully understand yet why this >> would not work. >> >> What I would like to avoid is to add more complexity into Flink if there >> is an easy solution which fulfills the requirements. That's why I would >> like to exercise thoroughly through the different alternatives. Also, I am >> missing a bit the comparison of Kerberos to other authentication mechanisms >> and why they were rejected in favour of Kerberos. >> >> Cheers, >> Till >> >> On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra wrote: >> >>> Hi! >>> >>> I think there might be possible alternatives but it seems Kerberos on >>> the rest endpoint ticks all the right boxes and provides a super clean and >>> simple solution for strong authentication. >>> >>> I wouldn’t even consider sidecar proxies etc if we can solve it in such >>> a simple way as proposed by G. >>> >>> Cheers >>> Gyula >>> >>> On Fri, 4 Jun 2021 at 10:03, Till Rohrmann wrote: >>> I am not saying that we shouldn't add a strong authentication mechanism if there are good reasons for it. I primarily would like to understand the context a bit better in order to give qualified feedback and come to a good decision. In order to do this, I have the feeling that we haven't fully considered all available options which are on the table, tbh. Does the problem of certificate expiry also apply for self-signed certificates? If yes, then this should then also be a problem for the internal encryption of Flink's communication. If not, then one could use self-signed certificates with a longer validity to solve the mentioned issue. I think you can set up Flink in such a way that you don't have to handle all the different certificates. For example, you could deploy Flink with a "sidecar proxy" which is responsible for the authentication using an arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local network interface. That way, the REST endpoint would only be available through the sidecar proxy. Additionally, one could enable SSL for this communication. Would this be a
[jira] [Created] (FLINK-22880) Remove "blink" term in code base
Timo Walther created FLINK-22880: Summary: Remove "blink" term in code base Key: FLINK-22880 URL: https://issues.apache.org/jira/browse/FLINK-22880 Project: Flink Issue Type: Sub-task Components: Documentation, Table SQL / API Reporter: Timo Walther Assignee: Timo Walther Apart from FLINK-22879 and some API parts (such as EnvironmentSettings and old SQL Client YAML), we should not use the term "blink" in the code base and documentation anymore. For giving some background information, we should only document that the current planner was called "blink planner" in the past. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22879) Remove "blink" suffix from table modules
Timo Walther created FLINK-22879: Summary: Remove "blink" suffix from table modules Key: FLINK-22879 URL: https://issues.apache.org/jira/browse/FLINK-22879 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner, Table SQL / Runtime Reporter: Timo Walther In order to reduce confusion around "What is Flink?/What is Blink?" we should remove the term {{blink}} from the following modules: {code} flink-table-planner-blink flink-table-runtime-blink flink-table-uber-blink {code} It is up for discussion if we will: - just perform the refactoring, - keep the old modules for a while, - or move the contents to new modules and link to those from the old modules In any case we should make sure to keep the Git history. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Watermark propagation with Sink API
Hi, > Imagine you're starting consuming from the result channel in a situation were you have: > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, record1, record0 > Switching to the encoded StreamStatus.IDLE is unnecessary, and might cause the record3 and record4 to be late depending on how the watermark progressed in other partitions. Yes, I understand this point. But it can also be the other way around. There might be a large gap between record2 and record3, and users might prefer or might be not able to duplicate idleness detection logic. The downstream system might be lacking some kind of information (that is only available in the top level/ingesting system) to correctly set the idle status. Piotrek pt., 4 cze 2021 o 12:30 Dawid Wysakowicz napisał(a): > > Same as Eron I don't follow this point. Any streaming sink can be used as > this kind of transient channel. Streaming sinks, like Kafka, are also used > to connect one streaming system with another one, also for an immediate > consumption. > > Sure it can, but imo it is rarely the primary use case why you want to > offload the channels to an external persistent system. Again in my > understanding StreamStatus is something transient, e.g. part of our > external system went offline. I think those kind of events should not be > persisted. > > Both watermarks and idleness status can be some > inherent property of the underlying data stream. if an upstream/ingesting > system knows that this particular stream/partition of a stream is going > idle (for example for a couple of hours), why does this information have to > be re-created in the downstream system using some heuristic? It could be > explicitly encoded. > > Because it's most certainly not true in the downstream. The idleness works > usually according to a heuristic: "We have not seen records for 5 minutes, > so there is a fair chance we won't see records for the next 5 minutes, so > let's not wait for watermarks for now." That heuristic most certainly won't > hold for a downstream persistent storage. > > Imagine you're starting consuming from the result channel in a situation > were you have: > > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, record1, > record0 > > Switching to the encoded StreamStatus.IDLE is unnecessary, and might cause > the record3 and record4 to be late depending on how the watermark > progressed in other partitions. > > I understand Eron's use case, which is not about storing the StreamStatus, > but performing an immediate aggregation or said differently changing the > partitioning/granularity of records and watermarks externally to Flink. The > produced by Flink partitioning is actually never persisted in that case. In > this case I agree exposing the StreamStatus makes sense. I am still > concerned it will lead to storing the StreamStatus which can lead to many > subtle problems. > On 04/06/2021 11:53, Piotr Nowojski wrote: > > Hi, > > Thanks for picking up this discussion. For the record, I also think we > shouldn't expose latency markers. > > About the stream status > > > Persisting the StreamStatus > > I don't agree with the view that sinks are "storing" the data/idleness > status. This nomenclature makes only sense if we are talking about > streaming jobs producing batch data. > > > In my understanding a StreamStatus makes sense only when talking about > immediately consumed transient channels such as between operators within > a single job. > > Same as Eron I don't follow this point. Any streaming sink can be used as > this kind of transient channel. Streaming sinks, like Kafka, are also used > to connect one streaming system with another one, also for an immediate > consumption. > > You could say the same thing about watermarks (note they are usually > generated in Flink based on the incoming events) and I would not agree with > it in the same way. Both watermarks and idleness status can be some > inherent property of the underlying data stream. if an upstream/ingesting > system knows that this particular stream/partition of a stream is going > idle (for example for a couple of hours), why does this information have to > be re-created in the downstream system using some heuristic? It could be > explicitly encoded. If you want to pass watermarks explicitly to a next > downstream streaming system, because you do not want to recreate them from > the events using a duplicated logic, why wouldn't you like to do the same > thing with the idleness? > > Also keep in mind that I would expect that a user can decide whether he > wants to persist the watermarks/stream status on his own. This shouldn't be > obligatory. > > For me there is one good reason to not expose stream status YET. That is, > if we are sure that we do not need this just yet, while at the same time we > don't want to expand the Public/PublicEvolving API, as this always > increases the maintenance cost. > > Best, > Piotrek > > > pt., 4 cze 2021 o 10:57 Eron Wright >
[jira] [Created] (FLINK-22878) Allow placeholder options in format factories
Timo Walther created FLINK-22878: Summary: Allow placeholder options in format factories Key: FLINK-22878 URL: https://issues.apache.org/jira/browse/FLINK-22878 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther The current factory design does not allow placeholder options in {{EncodingFormatFactory}} or {{DecodingFormatFactory}}. The past has shown that placeholder options are used at a couple of locations. See FLINK-22475 or {{KafkaOptions#PROPERTIES_PREFIX}}. We should think about adding an additional functionality to {{ReadableConfig}} or a special {{ConfigOption}} type to finally solve this problem. This could also be useful for FLIP-129. And would solve the [current shortcomings for Confluent Avro registry|https://github.com/apache/flink/pull/15808#discussion_r645494282]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22877) Remove BatchTableEnvironment and related API classes
Timo Walther created FLINK-22877: Summary: Remove BatchTableEnvironment and related API classes Key: FLINK-22877 URL: https://issues.apache.org/jira/browse/FLINK-22877 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther Assignee: Timo Walther Remove BatchTableEnvironment and other DataSet related API classes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [Discuss] Planning Flink 1.14
Hi all, @Xintong Song Thanks for reminding me, I would contact Jark to update the wiki page. Besides, I'd like to provide more inputs by sharing our experience about upgrading Internal version of Flink. Flink has been widely used in the production environment since 2018 in our company. Our internal version is far behind the latest stable version of the community by about 1 year. We upgraded the internal Flink version to 1.10 version in March last year, and we plan to upgrade directly to 1.13 next month (missed 1.11 and 1.12 versions). We wish to use the latest version as soon as possible. However, in fact we follow up with the community's latest stable release version almost once a year because upgrading to a new version is a time-consuming process. I list detailed works as follows. a. Before release new internal version 1) Required: Cherrypick internal features to the new Flink branch. A few features need to be redeveloped based on the new branch code base. BTW, The cost would be more and more heavy since we maintain more and more internal features in our internal version. 2) Optional: Some internal connectors need to adapt to the new API 3) Required: Surrounding products need to updated based on the new API, for example, Internal Flink SQL WEB development platform 4) Required: Regression tests b. After release, encourage users to upgrade existing jobs (Thousands of jobs) to the new version, User need some time to do : 1) Repackage jar for dataStream job 2) For critical jobs, users need to run jobs at the two versions at the same time for a while. Migrated to a new job only after comparing the data carefully. 3) Pure ETL SQL jobs are easy to bump up. But other Flink SQL jobs with stateful operators need extra efforts because Flink SQL Job does not support state compatibility yet. Best regards, JING ZHANG Prasanna kumar 于2021年6月4日周五 下午2:27写道: > Hi all, > > We are using Flink for our eventing system. Overall we are very happy with > the tech, documentation and community support and quick replies in mails. > > My last 1 year experience with versions. > > We were working on 1.10 initially during our research phase then we > stabilised with 1.11 as we moved on but by the time we are about to get > into production 1.12 was released. As with all software and products, > there were bugs reported. So we waited till 1.12.2 was released and then > upgraded. Within a month of us doing it 1.13 got released. > > But by past experience , we waited till at least a couple of minor > versions(fixing bugs) get released before we move onto a newer version. > The development happens at a rapid/good phase in flink (which is good in > terms of features) but adoption and moving the production code to newer > version 3/4 times a year is an onerous effort. For example , the memory > model was changed in one of the releases (there is a good documentation) . > But as a production user to adopt the newer version, at least a month of > testing is required with a huge scale environment. We also do not want to > be behind more than 2 versions at any point of time. > > I Personally feel 2 major releases a year or at max a release once 5 months > is good. > > Thanks > Prasanna. > > On Fri, Jun 4, 2021 at 9:38 AM Xintong Song wrote: > > > Thanks everyone for the feedback. > > > > @Jing, > > Thanks for the inputs. Could you please ask a committer who works > together > > with you on these items to fill them into the feature collecting wiki > page > > [1]? I assume Jark, who co-edited the flip wiki page, is working with > you? > > > > @Kurt, @Till and @Seth, > > First of all, a few things that potentially demotivate users from > > upgrading, observed from users that I've been in touch with. > > 1. It takes time for Flink major releases to get stabilized. Many users > > tend to waitting for the bugfix releases (x.y.1/2, or even x.y.3/4) > rather > > than upgrading to x.y.0 immediately. This could take months, sometimes > even > > after the next major release. > > 2. Many users maintain an internal version of Flink, with customized > > features for their specific businesses. For them, upgrading Flink > requires > > significant efforts to rebase those customized features. On the other > hand, > > the more versions they are left behind, the harder to contribute those > > features to the community, becoming a vicious cycle. > > > > I think the question to be answered is how do we prioritize between > > stabilizing a previous major release and casting a new major release. So > > far, it feels like the new release is prior. I recall that we have waited > > for weeks to release 1.11.3 because people were busy stabilizing 1.12.0. > > What if more resources are lean to the bugfix releases? We may have a > more > > explicit schedule for the bugfix releases. E.g., try to always release > the > > first bugfix release 2 weeks after the major release, the second bugfix > > release 4 weeks after that, and release on-demand starting from the
[jira] [Created] (FLINK-22876) Adding SharedObjects junit rule to ease test development
Arvid Heise created FLINK-22876: --- Summary: Adding SharedObjects junit rule to ease test development Key: FLINK-22876 URL: https://issues.apache.org/jira/browse/FLINK-22876 Project: Flink Issue Type: Improvement Components: Test Infrastructure Affects Versions: 1.14.0 Reporter: Arvid Heise Assignee: Arvid Heise Fix For: 1.14.0 Most test rely on static variables to sync between test code and UDFs to avoid serialization issues. However, static variables are error-prone and the tests look quaint. SharedObjects allow test developers to forget about the serialization and just use the objects across thread boundaries. The main idea is that shared objects are bound to the scope of a test case instead of a class. That allows us to: * get rid of all nasty reset methods for reused parts and most importantly avoid test bugs that result in us forgetting to reset (think of a latch in a shared WaitingSink) * it’s easier to reason about the test setup * it will allow us to share more code and provide more primitives (e.g. have some canonical way to wait for certain events) * run tests in parallel that reuse classes depending on shared objects * it will also be significantly easier to write tests for contributors. all student groups had issues with this synchronization of test code and UDFs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22875) Cannot work by Pickler and Unpickler
JYXL created FLINK-22875: Summary: Cannot work by Pickler and Unpickler Key: FLINK-22875 URL: https://issues.apache.org/jira/browse/FLINK-22875 Project: Flink Issue Type: Bug Reporter: JYXL -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Watermark propagation with Sink API
Hi, Thanks for picking up this discussion. For the record, I also think we shouldn't expose latency markers. About the stream status > Persisting the StreamStatus I don't agree with the view that sinks are "storing" the data/idleness status. This nomenclature makes only sense if we are talking about streaming jobs producing batch data. > In my understanding a StreamStatus makes sense only when talking about > immediately consumed transient channels such as between operators within > a single job. Same as Eron I don't follow this point. Any streaming sink can be used as this kind of transient channel. Streaming sinks, like Kafka, are also used to connect one streaming system with another one, also for an immediate consumption. You could say the same thing about watermarks (note they are usually generated in Flink based on the incoming events) and I would not agree with it in the same way. Both watermarks and idleness status can be some inherent property of the underlying data stream. if an upstream/ingesting system knows that this particular stream/partition of a stream is going idle (for example for a couple of hours), why does this information have to be re-created in the downstream system using some heuristic? It could be explicitly encoded. If you want to pass watermarks explicitly to a next downstream streaming system, because you do not want to recreate them from the events using a duplicated logic, why wouldn't you like to do the same thing with the idleness? Also keep in mind that I would expect that a user can decide whether he wants to persist the watermarks/stream status on his own. This shouldn't be obligatory. For me there is one good reason to not expose stream status YET. That is, if we are sure that we do not need this just yet, while at the same time we don't want to expand the Public/PublicEvolving API, as this always increases the maintenance cost. Best, Piotrek pt., 4 cze 2021 o 10:57 Eron Wright napisał(a): > I believe that the correctness of watermarks and stream status markers is > determined entirely by the source (ignoring the generic assigner). Such > stream elements are known not to overtake records, and aren't transient > from a pipeline perspective. I do agree that recoveries may be lossy if > some operator state is transient (e.g. valve state). > > Consider that status markers already affect the flow of watermarks (e.g. > suppression), and thus affect operator behavior. Seems to me that exposing > the idleness state is no different than exposing a watermark. > > The high-level story is, there is a need for the Flink job to be > transparent or neutral with respect to the event time clock. I believe > this is possible if time flows with high fidelity from source to sink. Of > course, one always has the choice as to whether to use source-based > watermarks; as you mentioned, requirements vary. > > Regarding the Pulsar specifics, we're working on a community proposal that > I'm anxious to share. To answer your question, the broker aggregates > watermarks from multiple producers who are writing to a single topic. > Each sink > subtask is a producer. The broker considers each producer's assertions > (watermarks, idleness) to be independent inputs, much like the case with > the watermark valve. > > On your concern about idleness causing false late events, I understand your > point but don't think it applies if the keyspace assignments are stable. > > I hope this explains to your satisfaction. > > - Eron > > > > > > On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz > wrote: > > > Hi Eron, > > > > I might be missing some background on Pulsar partitioning but something > > seems off to me. What is the chunk/batch/partition that Pulsar brokers > > will additionally combine watermarks for? Isn't it the case that only a > > single Flink sub-task would write to such a chunk and thus will produce > > an aggregated watermark already via the writeWatermark method? > > > > Personally I am really skeptical about exposing the StreamStatus in any > > Producer API. In my understanding the StreamStatus is a transient > > setting of a consumer of data. StreamStatus is a mechanism for making a > > tradeoff between correctness (how many late elements that are behind > > watermark we have) vs making progress. IMO one has to be extra cautious > > when it comes to persistent systems. Again I might be missing the exact > > use case you are trying to solve here, but I can imagine multiple jobs > > reading from such a stream which might have different correctness > > requirements. Just quickly throwing an idea out of my head you might > > want to have an entirely correct results which can be delayed for > > minutes, and a separate task that produces quick insights within > > seconds. Another thing to consider is that by the time the downstream > > job starts consuming the upstream one might have produced records to the > > previously idle chunk. Persisting the StreamStatus in such a
[jira] [Created] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table
Spongebob created FLINK-22874: - Summary: flink table partition trigger doesn't effect as expectation when sink into hive table Key: FLINK-22874 URL: https://issues.apache.org/jira/browse/FLINK-22874 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.13.1 Reporter: Spongebob I am trying to sink into hive partitioned table which partition commit trigger is declared as " partition-time", and I had assigned watermark on the dataStream. When I input some data into dataStream it can not commit hive partition on time. Here's my code {code:java} //ddl of hive table create table test_table(username string) partitioned by (ts bigint) stored as orc TBLPROPERTIES ( 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.policy.kind'='metastore,success-file' );{code} {code:java} // flink application code val streamEnv = ... val dataStream:DataStream[(String, Long)] = ... // assign watermark and output watermark info in processFunction class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, Long)] { override def processElement(value: (String, Long), ctx: ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: Collector[(String, Long, Long)]): Unit = { out.collect((value._1, value._2, ctx.timerService().currentWatermark())) } } val resultStream = dataStream .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] { override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = { element._2 * 1000 } })) .process(new MyProcessFunction) // val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()) // convert dataStream into hive catalog table and sink into hive streamTableEnv.createTemporaryView("test_catalog_t", resultStream) val catalog = ... streamTableEnv.registerCatalog("hive", catalog) streamTableEnv.useCatalog("hive") streamTableEnv.executeSql("insert into test_table select _1,_2 from default_catalog.default_database.test_catalog_t").print() // flink use the default parallelism 4 // input data (a, 1) (b, 2) (c, 3) (d, 4) (a, 5) ... // result there are much partition directories on hdfs but all they are inprogressing files and never would be commit to hive metastore.{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] Dashboard/HistoryServer authentication
> I did not mean for the user to sign its own certificates but for the operator of the cluster. Once the user request hits the proxy, it should no longer be under his control. I think I do not fully understand yet why this would not work. I said it's not solving the authentication problem over any proxy. Even if the operator is signing the certificate one can have access to an internal node. Such case anybody can craft certificates which is accepted by the server. When it's accepted a bad guy can cancel jobs causing huge impacts. > Also, I am missing a bit the comparison of Kerberos to other authentication mechanisms and why they were rejected in favour of Kerberos. PROS: * Since it's not depending on cloud provider and/or k8s or bare-metal etc. deployment it's the biggest plus * Centralized with tools and no need to write tons of tools around * There are clients/tools on almost all OS-es and several languages * Super huge users are using it for years in production w/o huge issues * Provides cross-realm trust possibility amongst other features * Several open source components using it which could increase compatibility CONS: * Not everybody using kerberos * It would increase the code footprint but this is true for many features (as a side note I'm here to maintain it) Feel free to add your points because it only represents a single viewpoint. Also if you have any better option for strong authentication please share it and we can consider the pros/cons here. BR, G On Fri, Jun 4, 2021 at 10:32 AM Till Rohrmann wrote: > I did not mean for the user to sign its own certificates but for the > operator of the cluster. Once the user request hits the proxy, it should no > longer be under his control. I think I do not fully understand yet why this > would not work. > > What I would like to avoid is to add more complexity into Flink if there > is an easy solution which fulfills the requirements. That's why I would > like to exercise thoroughly through the different alternatives. Also, I am > missing a bit the comparison of Kerberos to other authentication mechanisms > and why they were rejected in favour of Kerberos. > > Cheers, > Till > > On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra wrote: > >> Hi! >> >> I think there might be possible alternatives but it seems Kerberos on the >> rest endpoint ticks all the right boxes and provides a super clean and >> simple solution for strong authentication. >> >> I wouldn’t even consider sidecar proxies etc if we can solve it in such a >> simple way as proposed by G. >> >> Cheers >> Gyula >> >> On Fri, 4 Jun 2021 at 10:03, Till Rohrmann wrote: >> >>> I am not saying that we shouldn't add a strong authentication mechanism >>> if there are good reasons for it. I primarily would like to understand the >>> context a bit better in order to give qualified feedback and come to a good >>> decision. In order to do this, I have the feeling that we haven't fully >>> considered all available options which are on the table, tbh. >>> >>> Does the problem of certificate expiry also apply for self-signed >>> certificates? If yes, then this should then also be a problem for the >>> internal encryption of Flink's communication. If not, then one could use >>> self-signed certificates with a longer validity to solve the mentioned >>> issue. >>> >>> I think you can set up Flink in such a way that you don't have to handle >>> all the different certificates. For example, you could deploy Flink with a >>> "sidecar proxy" which is responsible for the authentication using an >>> arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local >>> network interface. That way, the REST endpoint would only be available >>> through the sidecar proxy. Additionally, one could enable SSL for this >>> communication. Would this be a solution for the problem? >>> >>> Cheers, >>> Till >>> >>> On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi >>> wrote: >>> That is an interesting idea, Till. The main issue with it is that TLS certificates have an expiration time, usually they get approved for a couple years. Forcing our users to restart jobs to reprovision TLS certificates would be weird when we could just implement a single proper strong authentication mechanism instead in a couple hundred lines of code. :-) In many cases it is also impractical to go the TLS mutual route, because the Flink Dashboard can end up on any node in the k8s/Yarn cluster which means that we need a certificate per node (due to the mutual auth), but if we also want to protect the private key of these from users accidentally or intentionally leaking them then we need this per user. As in we end up managing user*machine number certificates and having to renew them periodically, which albeit automatable is unfortunately not yet automated in all large organizations. I fully agree that TLS certificate mutual authentication has its nice
Re: [DISCUSS] Watermark propagation with Sink API
I believe that the correctness of watermarks and stream status markers is determined entirely by the source (ignoring the generic assigner). Such stream elements are known not to overtake records, and aren't transient from a pipeline perspective. I do agree that recoveries may be lossy if some operator state is transient (e.g. valve state). Consider that status markers already affect the flow of watermarks (e.g. suppression), and thus affect operator behavior. Seems to me that exposing the idleness state is no different than exposing a watermark. The high-level story is, there is a need for the Flink job to be transparent or neutral with respect to the event time clock. I believe this is possible if time flows with high fidelity from source to sink. Of course, one always has the choice as to whether to use source-based watermarks; as you mentioned, requirements vary. Regarding the Pulsar specifics, we're working on a community proposal that I'm anxious to share. To answer your question, the broker aggregates watermarks from multiple producers who are writing to a single topic. Each sink subtask is a producer. The broker considers each producer's assertions (watermarks, idleness) to be independent inputs, much like the case with the watermark valve. On your concern about idleness causing false late events, I understand your point but don't think it applies if the keyspace assignments are stable. I hope this explains to your satisfaction. - Eron On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz wrote: > Hi Eron, > > I might be missing some background on Pulsar partitioning but something > seems off to me. What is the chunk/batch/partition that Pulsar brokers > will additionally combine watermarks for? Isn't it the case that only a > single Flink sub-task would write to such a chunk and thus will produce > an aggregated watermark already via the writeWatermark method? > > Personally I am really skeptical about exposing the StreamStatus in any > Producer API. In my understanding the StreamStatus is a transient > setting of a consumer of data. StreamStatus is a mechanism for making a > tradeoff between correctness (how many late elements that are behind > watermark we have) vs making progress. IMO one has to be extra cautious > when it comes to persistent systems. Again I might be missing the exact > use case you are trying to solve here, but I can imagine multiple jobs > reading from such a stream which might have different correctness > requirements. Just quickly throwing an idea out of my head you might > want to have an entirely correct results which can be delayed for > minutes, and a separate task that produces quick insights within > seconds. Another thing to consider is that by the time the downstream > job starts consuming the upstream one might have produced records to the > previously idle chunk. Persisting the StreamStatus in such a scenario > would add unnecessary false late events. > > In my understanding a StreamStatus makes sense only when talking about > immediately consumed transient channels such as between operators within > a single job. > > Best, > > Dawid > > On 03/06/2021 23:31, Eron Wright wrote: > > I think the rationale for end-to-end idleness (i.e. between pipelines) is > > the same as the rationale for idleness between operators within a > > pipeline. On the 'main issue' you mentioned, we entrust the source with > > adapting to Flink's notion of idleness (e.g. in Pulsar source, it means > > that no topics/partitions are assigned to a given sub-task); a similar > > adaption would occur in the sink. In other words, I think it reasonable > > that a sink for a watermark-aware storage system has need for the > idleness > > signal. > > > > Let me explain how I would use it in Pulsar's sink. Each sub-task is a > > Pulsar producer, and is writing watermarks to a configured topic via the > > Producer API. The Pulsar broker aggregates the watermarks that are > written > > by each producer into a global minimum (similar to StatusWatermarkValve). > > The broker keeps track of which producers are actively producing > > watermarks, and a producer may mark itself as idle to tell the broker not > > to wait for watermarks from it, e.g. when a producer is going offline. I > > had intended to mark the producer as idle when the sub-task is closing, > but > > now I see that it would be insufficient; the producer should also be > idled > > if the sub-task is idled. Otherwise, the broker would wait indefinitely > > for the idled sub-task to produce a watermark. > > > > Arvid, I think your original instincts were correct about idleness > > propagation, and I hope I've demonstrated a practical use case. > > > > > > > > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise wrote: > > > >> When I was rethinking the idleness issue, I came to the conclusion that > it > >> should be inferred at the source of the respective downstream pipeline > >> again. > >> > >> The main issue on propagating
[jira] [Created] (FLINK-22873) Add ToC to configuration documentation
Chesnay Schepler created FLINK-22873: Summary: Add ToC to configuration documentation Key: FLINK-22873 URL: https://issues.apache.org/jira/browse/FLINK-22873 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.13.1 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.13.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, Very thanks @Dawid for resuming the discussion and very thanks @Till for the summary ! (and very sorry for I missed the mail and do not response in time...) I also agree with that we could consider the global commits latter separately after we have addressed the final checkpoints, and also other points as Till summarized. Currently the only case that have used the cascade commit is the Table FileSystem and Hive connectors. I checked the code and found currently they will commit the last piece of data directly in endOfInput(). Although this might emit repeat records if there are failover during job finishing, it avoids emitting the records in the notifyCheckpointComplete() after endOfInput(), thus the modification to the operator lifecycle in final checkpoints would cause compatibility problem for these connectors, thus we do not need to modify them at the first place. 2. Regarding the operator lifecycle, I also agree with the proposed changes. To sum up, I think the operator lifecycle would become endOfInput(1) ... endOfInput(n) flush() --> call UDF's flush method if some operator requires final checkpoints snapshotState() notifyCheckpointComplete() end if close() --> call UDF's close method Since currently the close() is only called in normal finish and dispose() will be called in both failover and normal case, for compatibility, I think we may have to postpone the change to a single close() method to version 2.0 ? 3. Regarding the name and position of flush() method, I also agree with that we will need a separate method to mark the termination of the whole stream for multiple-input streams. Would it be also ok if we have some modification to the current BoundedXXInput interfaces to interface BoundedInput { void endInput() // marks the end of the whole streams, as flush() does. } @deprecated // In the future we could remove this interface interface BoundedOneInput extends BoundedInput {} interface BoundedMultiInput extends BoundedInput { void endInput(int i); default void endInput() {} // For compatibility } If operator/UDF does not care about the end of a single input, then it could directly implement the BoundedInput interface. The possible benefit to me is that we might be able to keep only one concept for marking the end of stream, especially for the operators with only one input. Very thanks for all the deep insights and discussions! Best, Yun -- From:Dawid Wysakowicz Send Time:2021 Jun. 3 (Thu.) 21:21 To:dev ; Till Rohrmann ; Yun Gao Cc:Piotr Nowojski ; Guowei Ma ; Stephan Ewen Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi all, Thanks for the very insightful discussion. I'd like to revive the effort of FLIP-147. First of all, from my side I'd like to say that I am really interested in helping that happen in the upcoming 1.14 release. I agree with Till that the final checkpoints and global commits are mostly orthogonal. Similarly as Till, I'd suggest to first focus on the final checkpoints, while just keeping in mind we should not make assumptions that would make it impossible to implement the global commits. So far I do not see such risk from the discussion. Going back to the final checkpoints issue. I think the only outstanding issue is which methods we want to use for flushing/closing both operators and UDFs just before performing the final checkpoint. As pointed out to me by Piotr, I am mentioning UDFs here as well, because we need a way for users using the Public API to benefit from the final checkpoint (bear in mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our Kafka sink operates on the UDF level). Right now RichFunction has no method which could be called just before the final checkpoint that would say "flush" all intermediate state now and prepare for the final checkpoint. I'd suggest introducing an additional interface e.g. (name to be determined) interface Flushable { void flush(Collector out) } Additionally we would need to introduce a similar method on the StreamOperator level. Currently we have two methods that are called at the end of operator lifecycle: close dispose The usage of the two methods is a bit confusing. Dispose is responsible for closing all open resources and is supposed to be called in case of a failure. On the other hand the close is a combination of a non-existent "flush" method we need and dispose for closing resources in case of a successful run. I'd suggest to clear it a bit. We would introduce a proper "flush" method which would be called in case of a successful finishing of an operator. Moreover we would make "close" deal only with closing any open resources, basically taking over the role of the dispose, which we would deprecate. Lastly, I'd like to say why I think it is better introduce a new "flush" method instead of using the "endInput"
Re: [DISCUSS] Dashboard/HistoryServer authentication
I did not mean for the user to sign its own certificates but for the operator of the cluster. Once the user request hits the proxy, it should no longer be under his control. I think I do not fully understand yet why this would not work. What I would like to avoid is to add more complexity into Flink if there is an easy solution which fulfills the requirements. That's why I would like to exercise thoroughly through the different alternatives. Also, I am missing a bit the comparison of Kerberos to other authentication mechanisms and why they were rejected in favour of Kerberos. Cheers, Till On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra wrote: > Hi! > > I think there might be possible alternatives but it seems Kerberos on the > rest endpoint ticks all the right boxes and provides a super clean and > simple solution for strong authentication. > > I wouldn’t even consider sidecar proxies etc if we can solve it in such a > simple way as proposed by G. > > Cheers > Gyula > > On Fri, 4 Jun 2021 at 10:03, Till Rohrmann wrote: > >> I am not saying that we shouldn't add a strong authentication mechanism >> if there are good reasons for it. I primarily would like to understand the >> context a bit better in order to give qualified feedback and come to a good >> decision. In order to do this, I have the feeling that we haven't fully >> considered all available options which are on the table, tbh. >> >> Does the problem of certificate expiry also apply for self-signed >> certificates? If yes, then this should then also be a problem for the >> internal encryption of Flink's communication. If not, then one could use >> self-signed certificates with a longer validity to solve the mentioned >> issue. >> >> I think you can set up Flink in such a way that you don't have to handle >> all the different certificates. For example, you could deploy Flink with a >> "sidecar proxy" which is responsible for the authentication using an >> arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local >> network interface. That way, the REST endpoint would only be available >> through the sidecar proxy. Additionally, one could enable SSL for this >> communication. Would this be a solution for the problem? >> >> Cheers, >> Till >> >> On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi >> wrote: >> >>> That is an interesting idea, Till. >>> >>> The main issue with it is that TLS certificates have an expiration time, >>> usually they get approved for a couple years. Forcing our users to restart >>> jobs to reprovision TLS certificates would be weird when we could just >>> implement a single proper strong authentication mechanism instead in a >>> couple hundred lines of code. :-) >>> >>> In many cases it is also impractical to go the TLS mutual route, because >>> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which >>> means that we need a certificate per node (due to the mutual auth), but if >>> we also want to protect the private key of these from users accidentally or >>> intentionally leaking them then we need this per user. As in we end up >>> managing user*machine number certificates and having to renew them >>> periodically, which albeit automatable is unfortunately not yet automated >>> in all large organizations. >>> >>> I fully agree that TLS certificate mutual authentication has its nice >>> properties, especially at very large (multiple thousand node) clusters - >>> but it has its own challenges too. Thanks for bringing it up. >>> >>> Happy to have this added to the rejected alternative list so that we >>> have the full picture documented. >>> >>> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann >>> wrote: >>> I guess the idea would then be to let the proxy do the authentication job and only forward the request via an SSL mutually encrypted connection to the Flink cluster. Would this be possible? The beauty of this setup is in my opinion that this setup should work with all kinds of authentication mechanisms. Cheers, Till On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi wrote: > Thanks for giving options to fulfil the need. > > Users are looking for a solution where users can be identified on the > whole cluster and restrict access to resources/actions. > A good example for such an action is cancelling other users running > jobs. > > * SSL does provide mutual authentication but when authentication > passed there is no user based on restrictions can be made. > * The less problematic part is that generating/maintaining short time > valid certificates would be a hard (that's the reason KDC like servers > exist). > Having long time valid certificates would widen the attack surface but > since the first concern is there this is just a cosmetic issue. > > All in all using TLS certificates is not sufficient in these > environments unfortunately. > > BR, > G > >
Re: [DISCUSS] Dashboard/HistoryServer authentication
Hi! I think there might be possible alternatives but it seems Kerberos on the rest endpoint ticks all the right boxes and provides a super clean and simple solution for strong authentication. I wouldn’t even consider sidecar proxies etc if we can solve it in such a simple way as proposed by G. Cheers Gyula On Fri, 4 Jun 2021 at 10:03, Till Rohrmann wrote: > I am not saying that we shouldn't add a strong authentication mechanism if > there are good reasons for it. I primarily would like to understand the > context a bit better in order to give qualified feedback and come to a good > decision. In order to do this, I have the feeling that we haven't fully > considered all available options which are on the table, tbh. > > Does the problem of certificate expiry also apply for self-signed > certificates? If yes, then this should then also be a problem for the > internal encryption of Flink's communication. If not, then one could use > self-signed certificates with a longer validity to solve the mentioned > issue. > > I think you can set up Flink in such a way that you don't have to handle > all the different certificates. For example, you could deploy Flink with a > "sidecar proxy" which is responsible for the authentication using an > arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local > network interface. That way, the REST endpoint would only be available > through the sidecar proxy. Additionally, one could enable SSL for this > communication. Would this be a solution for the problem? > > Cheers, > Till > > On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi > wrote: > >> That is an interesting idea, Till. >> >> The main issue with it is that TLS certificates have an expiration time, >> usually they get approved for a couple years. Forcing our users to restart >> jobs to reprovision TLS certificates would be weird when we could just >> implement a single proper strong authentication mechanism instead in a >> couple hundred lines of code. :-) >> >> In many cases it is also impractical to go the TLS mutual route, because >> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which >> means that we need a certificate per node (due to the mutual auth), but if >> we also want to protect the private key of these from users accidentally or >> intentionally leaking them then we need this per user. As in we end up >> managing user*machine number certificates and having to renew them >> periodically, which albeit automatable is unfortunately not yet automated >> in all large organizations. >> >> I fully agree that TLS certificate mutual authentication has its nice >> properties, especially at very large (multiple thousand node) clusters - >> but it has its own challenges too. Thanks for bringing it up. >> >> Happy to have this added to the rejected alternative list so that we have >> the full picture documented. >> >> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann >> wrote: >> >>> I guess the idea would then be to let the proxy do the authentication >>> job and only forward the request via an SSL mutually encrypted connection >>> to the Flink cluster. Would this be possible? The beauty of this setup is >>> in my opinion that this setup should work with all kinds of authentication >>> mechanisms. >>> >>> Cheers, >>> Till >>> >>> On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi >>> wrote: >>> Thanks for giving options to fulfil the need. Users are looking for a solution where users can be identified on the whole cluster and restrict access to resources/actions. A good example for such an action is cancelling other users running jobs. * SSL does provide mutual authentication but when authentication passed there is no user based on restrictions can be made. * The less problematic part is that generating/maintaining short time valid certificates would be a hard (that's the reason KDC like servers exist). Having long time valid certificates would widen the attack surface but since the first concern is there this is just a cosmetic issue. All in all using TLS certificates is not sufficient in these environments unfortunately. BR, G On Thu, Jun 3, 2021 at 12:49 PM Till Rohrmann wrote: > Thanks for the information Gabor. If it is about securing the > communication between the REST client and the REST server, then Flink > already supports enabling mutual SSL authentication [1]. Would this be > enough to secure the communication and to pass an audit? > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-ssl/#external--rest-connectivity > > Cheers, > Till > > On Thu, Jun 3, 2021 at 10:33 AM Gabor Somogyi < > gabor.g.somo...@gmail.com> wrote: > >> Hi Till, >> >> Since I'm working in security area 10+ years let me share my thought. >> I would like to
Re: [DISCUSS] Dashboard/HistoryServer authentication
Till, thanks for investing time in giving further options. Marci, thanks for summarizing the use-case point of view. We've arrived back to one of the original problems. Namely if an attacker gets access to a node it's possible to cancel other user's jobs (and more can be done). Self signed certificate is almost no-op authentication in production environments because any user can sign its own certificate and no third party plays. This problem just can't be solved with SSL no matter from which point of view we consider it. BR, G On Fri, Jun 4, 2021 at 10:03 AM Till Rohrmann wrote: > I am not saying that we shouldn't add a strong authentication mechanism if > there are good reasons for it. I primarily would like to understand the > context a bit better in order to give qualified feedback and come to a good > decision. In order to do this, I have the feeling that we haven't fully > considered all available options which are on the table, tbh. > > Does the problem of certificate expiry also apply for self-signed > certificates? If yes, then this should then also be a problem for the > internal encryption of Flink's communication. If not, then one could use > self-signed certificates with a longer validity to solve the mentioned > issue. > > I think you can set up Flink in such a way that you don't have to handle > all the different certificates. For example, you could deploy Flink with a > "sidecar proxy" which is responsible for the authentication using an > arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local > network interface. That way, the REST endpoint would only be available > through the sidecar proxy. Additionally, one could enable SSL for this > communication. Would this be a solution for the problem? > > Cheers, > Till > > On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi > wrote: > >> That is an interesting idea, Till. >> >> The main issue with it is that TLS certificates have an expiration time, >> usually they get approved for a couple years. Forcing our users to restart >> jobs to reprovision TLS certificates would be weird when we could just >> implement a single proper strong authentication mechanism instead in a >> couple hundred lines of code. :-) >> >> In many cases it is also impractical to go the TLS mutual route, because >> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which >> means that we need a certificate per node (due to the mutual auth), but if >> we also want to protect the private key of these from users accidentally or >> intentionally leaking them then we need this per user. As in we end up >> managing user*machine number certificates and having to renew them >> periodically, which albeit automatable is unfortunately not yet automated >> in all large organizations. >> >> I fully agree that TLS certificate mutual authentication has its nice >> properties, especially at very large (multiple thousand node) clusters - >> but it has its own challenges too. Thanks for bringing it up. >> >> Happy to have this added to the rejected alternative list so that we have >> the full picture documented. >> >> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann >> wrote: >> >>> I guess the idea would then be to let the proxy do the authentication >>> job and only forward the request via an SSL mutually encrypted connection >>> to the Flink cluster. Would this be possible? The beauty of this setup is >>> in my opinion that this setup should work with all kinds of authentication >>> mechanisms. >>> >>> Cheers, >>> Till >>> >>> On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi >>> wrote: >>> Thanks for giving options to fulfil the need. Users are looking for a solution where users can be identified on the whole cluster and restrict access to resources/actions. A good example for such an action is cancelling other users running jobs. * SSL does provide mutual authentication but when authentication passed there is no user based on restrictions can be made. * The less problematic part is that generating/maintaining short time valid certificates would be a hard (that's the reason KDC like servers exist). Having long time valid certificates would widen the attack surface but since the first concern is there this is just a cosmetic issue. All in all using TLS certificates is not sufficient in these environments unfortunately. BR, G On Thu, Jun 3, 2021 at 12:49 PM Till Rohrmann wrote: > Thanks for the information Gabor. If it is about securing the > communication between the REST client and the REST server, then Flink > already supports enabling mutual SSL authentication [1]. Would this be > enough to secure the communication and to pass an audit? > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-ssl/#external--rest-connectivity > > Cheers, > Till >
Re: [DISCUSS] Dashboard/HistoryServer authentication
I am not saying that we shouldn't add a strong authentication mechanism if there are good reasons for it. I primarily would like to understand the context a bit better in order to give qualified feedback and come to a good decision. In order to do this, I have the feeling that we haven't fully considered all available options which are on the table, tbh. Does the problem of certificate expiry also apply for self-signed certificates? If yes, then this should then also be a problem for the internal encryption of Flink's communication. If not, then one could use self-signed certificates with a longer validity to solve the mentioned issue. I think you can set up Flink in such a way that you don't have to handle all the different certificates. For example, you could deploy Flink with a "sidecar proxy" which is responsible for the authentication using an arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local network interface. That way, the REST endpoint would only be available through the sidecar proxy. Additionally, one could enable SSL for this communication. Would this be a solution for the problem? Cheers, Till On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi wrote: > That is an interesting idea, Till. > > The main issue with it is that TLS certificates have an expiration time, > usually they get approved for a couple years. Forcing our users to restart > jobs to reprovision TLS certificates would be weird when we could just > implement a single proper strong authentication mechanism instead in a > couple hundred lines of code. :-) > > In many cases it is also impractical to go the TLS mutual route, because > the Flink Dashboard can end up on any node in the k8s/Yarn cluster which > means that we need a certificate per node (due to the mutual auth), but if > we also want to protect the private key of these from users accidentally or > intentionally leaking them then we need this per user. As in we end up > managing user*machine number certificates and having to renew them > periodically, which albeit automatable is unfortunately not yet automated > in all large organizations. > > I fully agree that TLS certificate mutual authentication has its nice > properties, especially at very large (multiple thousand node) clusters - > but it has its own challenges too. Thanks for bringing it up. > > Happy to have this added to the rejected alternative list so that we have > the full picture documented. > > On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann wrote: > >> I guess the idea would then be to let the proxy do the authentication job >> and only forward the request via an SSL mutually encrypted connection to >> the Flink cluster. Would this be possible? The beauty of this setup is in >> my opinion that this setup should work with all kinds of authentication >> mechanisms. >> >> Cheers, >> Till >> >> On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi >> wrote: >> >>> Thanks for giving options to fulfil the need. >>> >>> Users are looking for a solution where users can be identified on the >>> whole cluster and restrict access to resources/actions. >>> A good example for such an action is cancelling other users running jobs. >>> >>> * SSL does provide mutual authentication but when authentication passed >>> there is no user based on restrictions can be made. >>> * The less problematic part is that generating/maintaining short time >>> valid certificates would be a hard (that's the reason KDC like servers >>> exist). >>> Having long time valid certificates would widen the attack surface but >>> since the first concern is there this is just a cosmetic issue. >>> >>> All in all using TLS certificates is not sufficient in these >>> environments unfortunately. >>> >>> BR, >>> G >>> >>> >>> On Thu, Jun 3, 2021 at 12:49 PM Till Rohrmann >>> wrote: >>> Thanks for the information Gabor. If it is about securing the communication between the REST client and the REST server, then Flink already supports enabling mutual SSL authentication [1]. Would this be enough to secure the communication and to pass an audit? [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-ssl/#external--rest-connectivity Cheers, Till On Thu, Jun 3, 2021 at 10:33 AM Gabor Somogyi < gabor.g.somo...@gmail.com> wrote: > Hi Till, > > Since I'm working in security area 10+ years let me share my thought. > I would like to emphasise there are experts better than me but I have > some > basics. > The discussion is open and not trying to tell alone things... > > > I mean if an attacker can get access to one of the machines, then it > should also be possible to obtain the right Kerberos token. > Not necessarily. For example if one gets access to a specific user's > credentials then it's not possible to compromise other user's jobs, > data, > etc... > Security is like an onion,
Re: [DISCUSS] Watermark propagation with Sink API
Hi Eron, I might be missing some background on Pulsar partitioning but something seems off to me. What is the chunk/batch/partition that Pulsar brokers will additionally combine watermarks for? Isn't it the case that only a single Flink sub-task would write to such a chunk and thus will produce an aggregated watermark already via the writeWatermark method? Personally I am really skeptical about exposing the StreamStatus in any Producer API. In my understanding the StreamStatus is a transient setting of a consumer of data. StreamStatus is a mechanism for making a tradeoff between correctness (how many late elements that are behind watermark we have) vs making progress. IMO one has to be extra cautious when it comes to persistent systems. Again I might be missing the exact use case you are trying to solve here, but I can imagine multiple jobs reading from such a stream which might have different correctness requirements. Just quickly throwing an idea out of my head you might want to have an entirely correct results which can be delayed for minutes, and a separate task that produces quick insights within seconds. Another thing to consider is that by the time the downstream job starts consuming the upstream one might have produced records to the previously idle chunk. Persisting the StreamStatus in such a scenario would add unnecessary false late events. In my understanding a StreamStatus makes sense only when talking about immediately consumed transient channels such as between operators within a single job. Best, Dawid On 03/06/2021 23:31, Eron Wright wrote: > I think the rationale for end-to-end idleness (i.e. between pipelines) is > the same as the rationale for idleness between operators within a > pipeline. On the 'main issue' you mentioned, we entrust the source with > adapting to Flink's notion of idleness (e.g. in Pulsar source, it means > that no topics/partitions are assigned to a given sub-task); a similar > adaption would occur in the sink. In other words, I think it reasonable > that a sink for a watermark-aware storage system has need for the idleness > signal. > > Let me explain how I would use it in Pulsar's sink. Each sub-task is a > Pulsar producer, and is writing watermarks to a configured topic via the > Producer API. The Pulsar broker aggregates the watermarks that are written > by each producer into a global minimum (similar to StatusWatermarkValve). > The broker keeps track of which producers are actively producing > watermarks, and a producer may mark itself as idle to tell the broker not > to wait for watermarks from it, e.g. when a producer is going offline. I > had intended to mark the producer as idle when the sub-task is closing, but > now I see that it would be insufficient; the producer should also be idled > if the sub-task is idled. Otherwise, the broker would wait indefinitely > for the idled sub-task to produce a watermark. > > Arvid, I think your original instincts were correct about idleness > propagation, and I hope I've demonstrated a practical use case. > > > > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise wrote: > >> When I was rethinking the idleness issue, I came to the conclusion that it >> should be inferred at the source of the respective downstream pipeline >> again. >> >> The main issue on propagating idleness is that you would force the same >> definition across all downstream pipelines, which may not be what the user >> intended. >> On the other hand, I don't immediately see a technical reason why the >> downstream source wouldn't be able to infer that. >> >> >> On Thu, Jun 3, 2021 at 9:14 PM Eron Wright > .invalid> >> wrote: >> >>> Thanks Piotr for bringing this up. I reflected on this and I agree we >>> should expose idleness, otherwise a multi-stage flow could stall. >>> >>> Regarding the latency markers, I don't see an immediate need for >>> propagating them, because they serve to estimate latency within a >> pipeline, >>> not across pipelines. One would probably need to enhance the source >>> interface also to do e2e latency. Seems we agree this aspect is out of >>> scope. >>> >>> I took a look at the code to get a sense of how to accomplish this. The >>> gist is a new `markIdle` method on the `StreamOperator` interface, that >> is >>> called when the stream status maintainer (the `OperatorChain`) >> transitions >>> to idle state. Then, a new `markIdle` method on the `SinkFunction` and >>> `SinkWriter` that is called by the respective operators. Note that >>> StreamStatus is an internal class. >>> >>> Here's a draft PR (based on the existing PR of FLINK-22700) to highlight >>> this new aspect: >>> https://github.com/streamnative/flink/pull/2/files >>> >>> Please let me know if you'd like me to proceed to update the FLIP with >>> these details. >>> >>> Thanks again, >>> Eron >>> >>> On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski >>> wrote: >>> Hi, Sorry for chipping in late in the discussion, but I would second this >>>
Re: [Discuss] Planning Flink 1.14
Hi all, We are using Flink for our eventing system. Overall we are very happy with the tech, documentation and community support and quick replies in mails. My last 1 year experience with versions. We were working on 1.10 initially during our research phase then we stabilised with 1.11 as we moved on but by the time we are about to get into production 1.12 was released. As with all software and products, there were bugs reported. So we waited till 1.12.2 was released and then upgraded. Within a month of us doing it 1.13 got released. But by past experience , we waited till at least a couple of minor versions(fixing bugs) get released before we move onto a newer version. The development happens at a rapid/good phase in flink (which is good in terms of features) but adoption and moving the production code to newer version 3/4 times a year is an onerous effort. For example , the memory model was changed in one of the releases (there is a good documentation) . But as a production user to adopt the newer version, at least a month of testing is required with a huge scale environment. We also do not want to be behind more than 2 versions at any point of time. I Personally feel 2 major releases a year or at max a release once 5 months is good. Thanks Prasanna. On Fri, Jun 4, 2021 at 9:38 AM Xintong Song wrote: > Thanks everyone for the feedback. > > @Jing, > Thanks for the inputs. Could you please ask a committer who works together > with you on these items to fill them into the feature collecting wiki page > [1]? I assume Jark, who co-edited the flip wiki page, is working with you? > > @Kurt, @Till and @Seth, > First of all, a few things that potentially demotivate users from > upgrading, observed from users that I've been in touch with. > 1. It takes time for Flink major releases to get stabilized. Many users > tend to waitting for the bugfix releases (x.y.1/2, or even x.y.3/4) rather > than upgrading to x.y.0 immediately. This could take months, sometimes even > after the next major release. > 2. Many users maintain an internal version of Flink, with customized > features for their specific businesses. For them, upgrading Flink requires > significant efforts to rebase those customized features. On the other hand, > the more versions they are left behind, the harder to contribute those > features to the community, becoming a vicious cycle. > > I think the question to be answered is how do we prioritize between > stabilizing a previous major release and casting a new major release. So > far, it feels like the new release is prior. I recall that we have waited > for weeks to release 1.11.3 because people were busy stabilizing 1.12.0. > What if more resources are lean to the bugfix releases? We may have a more > explicit schedule for the bugfix releases. E.g., try to always release the > first bugfix release 2 weeks after the major release, the second bugfix > release 4 weeks after that, and release on-demand starting from the third > bugfix release. Or some other rules like this. Would that help speeding up > the stabilization of release and give the users more confidence to upgrade > earlier? > > A related question is how do we prioritize between casting a release and > motivating more contributors. According to my experience, what Kurt > described, that committers cannot help contributors due to "planned > features", usually happens during the release testing period or right > before that (when people are struggling to catch the feature freeze). This > probably indicates that currently casting a release timely is prioritized > over the contributor's experience. Do we need to change that? > > If extending the release period does not come in a way that simply more > features are pushed into each release, but rather allowing a longer period > for the release to get stabilized while leaving more capacity for bugfix > releases and helping contributors, it might be a good idea. To be specific, > currently we have the 4 months period as 3 months feature developing + 1 > month release testing. We might consider a 5 months period as 3 months > feature developing + 2 month release testing. > > To sum up, I'm leaning towards extending the overall release period a bit, > while keeping the period before feature freeze. WDYT? > > Thank you~ > > Xintong Song > > > [1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release > > On Thu, Jun 3, 2021 at 9:00 PM Seth Wiesman wrote: > > > Hi Everyone, > > > > +1 for the Release Managers. Thank you all for volunteering. > > > > @Till Rohrmann A common sentiment that I have > heard > > from many users is that upgrading off of 1.9 was very difficult. In > > particular, a lot of people struggled to understand the new memory model. > > Many users who required custom memory configurations in earlier versions > > assumed they should carry those configurations into latter versions and > > then found themselves with OOM and instability issues. The good news is > > Flink