退订

2021-06-04 Thread lizikunn
退订


| |
lizikunn
|
|
lizik...@163.com
|
签名由网易邮箱大师定制



[jira] [Created] (FLINK-22883) Select view columns fail when store metadata with hive

2021-06-04 Thread ELLEX_SHEN (Jira)
ELLEX_SHEN created FLINK-22883:
--

 Summary: Select view columns fail when store metadata with hive
 Key: FLINK-22883
 URL: https://issues.apache.org/jira/browse/FLINK-22883
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: ELLEX_SHEN


1、Flink的元数据使用Hive进行保存,新建视图并保存后,Select时会报错找不到列;

2、在V1.12.3版本上没有问题,但是,升级为V1.13.0 / V1.13.1 后,View的还是会报错无法查询到列的问题。

3、实例代码,以下最后一行Inset报错:

-- --- 0、新建数据库 
---
-- 初始化
CREATE CATALOG myhive WITH (
 'type' = 'hive',
 'default-database' = 'test'
 -- 'hive-conf-dir' = '/usr/local/share/flink-1.13.0/conf' -- 
默认读取本地hive-site.xml
);
USE CATALOG myhive;
SET table.sql-dialect=default;
-- --- 1、KAFKA 源系统数据 
---
DROP TABLE IF EXISTS e_click;

CREATE TABLE e_click (
 user_id STRING,
 ts STRING,
 log_ts TIMESTAMP(3),
 proctime as PROCTIME(),
 WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
 -- Define watermark on TIMESTAMP column
)with (
 'connector.type' = 'kafka',
 'connector.version' = 'universal',
 'connector.properties.bootstrap.servers' = 
'master.northking.com:9092,slave1.northking.com:9092,slave2.northking.com:9092',
 'connector.topic' = 'user_clicks',
 'connector.properties.group.id' = 'click',
 'connector.startup-mode' = 'latest-offset',
 'format.type' = 'json'
);

DROP VIEW IF EXISTS e_click_v;
create view e_click_v as select * from e_click;

--
SET table.sql-dialect=hive;

drop table IF EXISTS e_click_hive;
CREATE TABLE e_click_hive (
 user_id STRING,
 ts STRING
) PARTITIONED BY (
 dt STRING
) STORED AS parquet TBLPROPERTIES (
 'sink.partition-commit.trigger'='partition-time',
 'partition.time-extractor.timestamp-pattern'='$dt',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- view -> Hive
SET table.sql-dialect=default;
insert into e_click_hive
 select user_id,ts,DATE_FORMAT(log_ts, '-MM-dd')
 from e_click_v;

4、报错信息:

Caused by: org.apache.calcite.runtime.CalciteContextException: From line 2, 
column 12 to line 2, column 18: Column 'user_id' not found in any tableCaused 
by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 12 
to line 2, column 18: Column 'user_id' not found in any table at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
 at 
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5833)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:5982)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:5967)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5416)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:398)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
 at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
 at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
 at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
I understand your scenario but I disagree with its assumptions:

"However, the partition of A is empty and thus A is temporarily idle." -
you're assuming that the behavior of the source is to mark itself idle if
data isn't available, but that's clearly source-specific and not behavior
we expect to have in Pulsar source.  A partition may be empty indefinitely
while still being active.  Imagine that the producer is defending a lease -
"I'm here, there's no data, please don't advance the clock".

"we bind idleness to wall clock time" - you're characterizing a specific
strategy (WatermarkStrategy.withIdleness()), not the inherent behavior of
the pipeline.  I wouldn't recommend using withIdleness() with source-based
watermarks.

I do agree that dynamism in partition assignment can wreak havoc on
watermark correctness.  We have some ideas on the Pulsar side about that
too.  I would ask that we focus on the Flink framework and pipeline
behavior.  By offering a more powerful framework, we encourage stream
storage systems to "rise to the occasion" - treat event time in a
first-class way, optimize for correctness, etc.  In this case, FLIP-167 is
setting the stage for evolution in Pulsar.

Thanks again Arvid for the great discussion.





On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise  wrote:

> At least one big motivation is having (temporary) empty partitions. Let me
> give you an example, why imho idleness is only approximate in this case:
> Assume you have source subtask A, B, C that correspond to 3 source
> partitions and a downstream keyed window operator W.
>
> W would usually trigger on min_watermark(A, B, C). However, the partition
> of A is empty and thus A is temporarily idle. So W triggers on
> min_watermark(B, C). When A is now active again, the watermark implicitly
> is min_watermark(B, C) for A!
>
> Let's further assume that the source is filled by another pipeline before.
> This pipeline experiences technical difficulties for X minutes and could
> not produce into the partition of A, hence the idleness. When the upstream
> pipeline resumes it fills A with some records that are before
> min_watermark(B, C). Any watermark generated from these records is
> discarded as the watermark is monotonous. Therefore, these records will be
> considered late by W and discarded.
>
> Without idleness, we would have simply bocked W until the upstream pipeline
> fully recovers and we would not have had any late records. The same holds
> for any reprocessing where the data of partition A is continuous.
>
> If you look deeper, the issue is that we bind idleness to wall clock time
> (e.g. advance watermark after X seconds without data). Then we assume the
> watermark of the idle partition to be in sync with the slowest partition.
> However, in the case of hiccups, this assumption does not hold at all.
> I don't see any fix for that (easy or not easy) and imho it's inherent to
> the design of idleness.
> We lack information (why is no data coming) and have a heuristic to fix it.
>
> In the case of partition assignment where one subtask has no partition, we
> are probably somewhat safe. We know why no data is coming (no partition)
> and as long as we do not have dynamic partition assignment, there will
> never be a switch to active without restart (for the foreseeable future).
>
> On Fri, Jun 4, 2021 at 10:34 PM Eron Wright  .invalid>
> wrote:
>
> > Yes I'm talking about an implementation of idleness that is unrelated to
> > processing time.  The clear example is partition assignment to subtasks,
> > which probably motivated Flink's idleness functionality in the first
> place.
> >
> > On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise  wrote:
> >
> > > Hi Eron,
> > >
> > > Are you referring to an implementation of idleness that does not rely
> on
> > a
> > > wall clock but on some clock baked into the partition information of
> the
> > > source system?
> > > If so, you are right that it invalidates my points.
> > > Do you have an example on where this is used?
> > >
> > > With a wall clock, you always run into the issues that I describe since
> > you
> > > are effectively mixing event time and processing time...
> > >
> > >
> > > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright  > > .invalid>
> > > wrote:
> > >
> > > > Dawid, I think you're mischaracterizing the idleness signal as
> > > inherently a
> > > > heuristic, but Flink does not impose that.  A source-based watermark
> > (and
> > > > corresponding idleness signal) may well be entirely data-driven,
> > entirely
> > > > deterministic.  Basically you're underselling what the pipeline is
> > > capable
> > > > of, based on painful experiences with using the generic,
> > heuristics-based
> > > > watermark assigner.  Please don't let those experiences overshadow
> > what's
> > > > possible with source-based watermarking.
> > > >
> > > > The idleness signal does have a strict definition, it indicates
> whether
> > > the
> > > > stream is actively participating in advancing the event time 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Arvid Heise
At least one big motivation is having (temporary) empty partitions. Let me
give you an example, why imho idleness is only approximate in this case:
Assume you have source subtask A, B, C that correspond to 3 source
partitions and a downstream keyed window operator W.

W would usually trigger on min_watermark(A, B, C). However, the partition
of A is empty and thus A is temporarily idle. So W triggers on
min_watermark(B, C). When A is now active again, the watermark implicitly
is min_watermark(B, C) for A!

Let's further assume that the source is filled by another pipeline before.
This pipeline experiences technical difficulties for X minutes and could
not produce into the partition of A, hence the idleness. When the upstream
pipeline resumes it fills A with some records that are before
min_watermark(B, C). Any watermark generated from these records is
discarded as the watermark is monotonous. Therefore, these records will be
considered late by W and discarded.

Without idleness, we would have simply bocked W until the upstream pipeline
fully recovers and we would not have had any late records. The same holds
for any reprocessing where the data of partition A is continuous.

If you look deeper, the issue is that we bind idleness to wall clock time
(e.g. advance watermark after X seconds without data). Then we assume the
watermark of the idle partition to be in sync with the slowest partition.
However, in the case of hiccups, this assumption does not hold at all.
I don't see any fix for that (easy or not easy) and imho it's inherent to
the design of idleness.
We lack information (why is no data coming) and have a heuristic to fix it.

In the case of partition assignment where one subtask has no partition, we
are probably somewhat safe. We know why no data is coming (no partition)
and as long as we do not have dynamic partition assignment, there will
never be a switch to active without restart (for the foreseeable future).

On Fri, Jun 4, 2021 at 10:34 PM Eron Wright 
wrote:

> Yes I'm talking about an implementation of idleness that is unrelated to
> processing time.  The clear example is partition assignment to subtasks,
> which probably motivated Flink's idleness functionality in the first place.
>
> On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise  wrote:
>
> > Hi Eron,
> >
> > Are you referring to an implementation of idleness that does not rely on
> a
> > wall clock but on some clock baked into the partition information of the
> > source system?
> > If so, you are right that it invalidates my points.
> > Do you have an example on where this is used?
> >
> > With a wall clock, you always run into the issues that I describe since
> you
> > are effectively mixing event time and processing time...
> >
> >
> > On Fri, Jun 4, 2021 at 6:28 PM Eron Wright  > .invalid>
> > wrote:
> >
> > > Dawid, I think you're mischaracterizing the idleness signal as
> > inherently a
> > > heuristic, but Flink does not impose that.  A source-based watermark
> (and
> > > corresponding idleness signal) may well be entirely data-driven,
> entirely
> > > deterministic.  Basically you're underselling what the pipeline is
> > capable
> > > of, based on painful experiences with using the generic,
> heuristics-based
> > > watermark assigner.  Please don't let those experiences overshadow
> what's
> > > possible with source-based watermarking.
> > >
> > > The idleness signal does have a strict definition, it indicates whether
> > the
> > > stream is actively participating in advancing the event time clock.
> The
> > > status of all participants is considered when aggregating watermarks.
> A
> > > source subtask generally makes the determination based on data, e.g.
> > > whether a topic is assigned to that subtask.
> > >
> > > We have here a modest proposal to add callbacks to the sink function
> for
> > > information that the sink operator already receives.  The practical
> > result
> > > is improved correctness when used with streaming systems that have
> > > first-class support for event time.  The specific changes may be
> > previewed
> > > here:
> > > https://github.com/apache/flink/pull/15950
> > > https://github.com/streamnative/flink/pull/2
> > >
> > > Thank you all for the robust discussion. Do I have your support to
> > proceed
> > > to enhance FLIP-167 with idleness callbacks and to proceed to a vote?
> > >
> > > Eron
> > >
> > >
> > > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise  wrote:
> > >
> > > > While everything I wrote before is still valid, upon further
> > rethinking,
> > > I
> > > > think that the conclusion is not necessarily correct:
> > > > - If the user wants to have pipeline A and B behaving as if A+B was
> > > jointly
> > > > executed in the same pipeline without the intermediate Pulsar topic,
> > > having
> > > > the idleness in that topic is to only way to guarantee consistency.
> > > > - We could support the following in the respective sources: If the
> user
> > > > that wants to use a different definition of 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
Yes I'm talking about an implementation of idleness that is unrelated to
processing time.  The clear example is partition assignment to subtasks,
which probably motivated Flink's idleness functionality in the first place.

On Fri, Jun 4, 2021 at 12:53 PM Arvid Heise  wrote:

> Hi Eron,
>
> Are you referring to an implementation of idleness that does not rely on a
> wall clock but on some clock baked into the partition information of the
> source system?
> If so, you are right that it invalidates my points.
> Do you have an example on where this is used?
>
> With a wall clock, you always run into the issues that I describe since you
> are effectively mixing event time and processing time...
>
>
> On Fri, Jun 4, 2021 at 6:28 PM Eron Wright  .invalid>
> wrote:
>
> > Dawid, I think you're mischaracterizing the idleness signal as
> inherently a
> > heuristic, but Flink does not impose that.  A source-based watermark (and
> > corresponding idleness signal) may well be entirely data-driven, entirely
> > deterministic.  Basically you're underselling what the pipeline is
> capable
> > of, based on painful experiences with using the generic, heuristics-based
> > watermark assigner.  Please don't let those experiences overshadow what's
> > possible with source-based watermarking.
> >
> > The idleness signal does have a strict definition, it indicates whether
> the
> > stream is actively participating in advancing the event time clock.  The
> > status of all participants is considered when aggregating watermarks.  A
> > source subtask generally makes the determination based on data, e.g.
> > whether a topic is assigned to that subtask.
> >
> > We have here a modest proposal to add callbacks to the sink function for
> > information that the sink operator already receives.  The practical
> result
> > is improved correctness when used with streaming systems that have
> > first-class support for event time.  The specific changes may be
> previewed
> > here:
> > https://github.com/apache/flink/pull/15950
> > https://github.com/streamnative/flink/pull/2
> >
> > Thank you all for the robust discussion. Do I have your support to
> proceed
> > to enhance FLIP-167 with idleness callbacks and to proceed to a vote?
> >
> > Eron
> >
> >
> > On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise  wrote:
> >
> > > While everything I wrote before is still valid, upon further
> rethinking,
> > I
> > > think that the conclusion is not necessarily correct:
> > > - If the user wants to have pipeline A and B behaving as if A+B was
> > jointly
> > > executed in the same pipeline without the intermediate Pulsar topic,
> > having
> > > the idleness in that topic is to only way to guarantee consistency.
> > > - We could support the following in the respective sources: If the user
> > > that wants to use a different definition of idleness in B, they can
> just
> > > provide a new idleness definition. At that point, we should discard the
> > > idleness in the intermediate topic while reading.
> > >
> > > If we would agree on the latter way, I think having the idleness in the
> > > topic is of great use because it's a piece of information that cannot
> be
> > > inferred as stated by others. Consequently, we would be able to support
> > all
> > > use cases and can give the user the freedom to express his intent.
> > >
> > >
> > > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise  wrote:
> > >
> > > > I think the core issue in this discussion is that we kind of assume
> > that
> > > > idleness is something universally well-defined. But it's not. It's a
> > > > heuristic to advance data processing in event time where we would
> lack
> > > data
> > > > to do so otherwise.
> > > > Keep in mind that idleness has no real definition in terms of event
> > time
> > > > and leads to severe unexpected results: If you reprocess a data
> stream
> > > with
> > > > temporarily idle partitions, these partitions would not be deemed
> idle
> > on
> > > > reprocessing and there is a realistic chance that records that were
> > > deemed
> > > > late in the live processing case are now perfectly fine records in
> the
> > > > reprocessing case. (I can expand on that if that was too short)
> > > >
> > > > With that in mind, why would a downstream process even try to
> calculate
> > > > the same idleness state as the upstream process? I don't see a point;
> > we
> > > > would just further any imprecision in the calculation.
> > > >
> > > > Let's have a concrete example. Assume that we have upstream pipeline
> A
> > > and
> > > > downstream pipeline B. A has plenty of resources and is live
> processing
> > > > data. Some partitions are idle and that is propagated to the sinks.
> > Now B
> > > > is heavily backpressured and consumes very slowly. B doesn't see any
> > > > idleness directly. B can calculate exact watermarks and use all
> records
> > > for
> > > > it's calculation. Reprocessing would yield the same result for B. If
> we
> > > now
> > > > forward idleness, we can easily find 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Arvid Heise
Hi Eron,

Are you referring to an implementation of idleness that does not rely on a
wall clock but on some clock baked into the partition information of the
source system?
If so, you are right that it invalidates my points.
Do you have an example on where this is used?

With a wall clock, you always run into the issues that I describe since you
are effectively mixing event time and processing time...


On Fri, Jun 4, 2021 at 6:28 PM Eron Wright 
wrote:

> Dawid, I think you're mischaracterizing the idleness signal as inherently a
> heuristic, but Flink does not impose that.  A source-based watermark (and
> corresponding idleness signal) may well be entirely data-driven, entirely
> deterministic.  Basically you're underselling what the pipeline is capable
> of, based on painful experiences with using the generic, heuristics-based
> watermark assigner.  Please don't let those experiences overshadow what's
> possible with source-based watermarking.
>
> The idleness signal does have a strict definition, it indicates whether the
> stream is actively participating in advancing the event time clock.  The
> status of all participants is considered when aggregating watermarks.  A
> source subtask generally makes the determination based on data, e.g.
> whether a topic is assigned to that subtask.
>
> We have here a modest proposal to add callbacks to the sink function for
> information that the sink operator already receives.  The practical result
> is improved correctness when used with streaming systems that have
> first-class support for event time.  The specific changes may be previewed
> here:
> https://github.com/apache/flink/pull/15950
> https://github.com/streamnative/flink/pull/2
>
> Thank you all for the robust discussion. Do I have your support to proceed
> to enhance FLIP-167 with idleness callbacks and to proceed to a vote?
>
> Eron
>
>
> On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise  wrote:
>
> > While everything I wrote before is still valid, upon further rethinking,
> I
> > think that the conclusion is not necessarily correct:
> > - If the user wants to have pipeline A and B behaving as if A+B was
> jointly
> > executed in the same pipeline without the intermediate Pulsar topic,
> having
> > the idleness in that topic is to only way to guarantee consistency.
> > - We could support the following in the respective sources: If the user
> > that wants to use a different definition of idleness in B, they can just
> > provide a new idleness definition. At that point, we should discard the
> > idleness in the intermediate topic while reading.
> >
> > If we would agree on the latter way, I think having the idleness in the
> > topic is of great use because it's a piece of information that cannot be
> > inferred as stated by others. Consequently, we would be able to support
> all
> > use cases and can give the user the freedom to express his intent.
> >
> >
> > On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise  wrote:
> >
> > > I think the core issue in this discussion is that we kind of assume
> that
> > > idleness is something universally well-defined. But it's not. It's a
> > > heuristic to advance data processing in event time where we would lack
> > data
> > > to do so otherwise.
> > > Keep in mind that idleness has no real definition in terms of event
> time
> > > and leads to severe unexpected results: If you reprocess a data stream
> > with
> > > temporarily idle partitions, these partitions would not be deemed idle
> on
> > > reprocessing and there is a realistic chance that records that were
> > deemed
> > > late in the live processing case are now perfectly fine records in the
> > > reprocessing case. (I can expand on that if that was too short)
> > >
> > > With that in mind, why would a downstream process even try to calculate
> > > the same idleness state as the upstream process? I don't see a point;
> we
> > > would just further any imprecision in the calculation.
> > >
> > > Let's have a concrete example. Assume that we have upstream pipeline A
> > and
> > > downstream pipeline B. A has plenty of resources and is live processing
> > > data. Some partitions are idle and that is propagated to the sinks.
> Now B
> > > is heavily backpressured and consumes very slowly. B doesn't see any
> > > idleness directly. B can calculate exact watermarks and use all records
> > for
> > > it's calculation. Reprocessing would yield the same result for B. If we
> > now
> > > forward idleness, we can easily find cases where we would advance the
> > > watermark prematurely while there is data directly available to
> calculate
> > > the exact watermark.
> > >
> > > For me, idleness is just a pipeline-specific heuristic and should be
> > > viewed as such.
> > >
> > > Best,
> > >
> > > Arvid
> > >
> > > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski 
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> > Imagine you're starting consuming from the result channel in a
> > situation
> > >> were you have:
> > >> > record4, record3, 

Re: [VOTE] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
Little update on this, more good discussion over the last few days, and the
FLIP will probably be amended to incorporate idleness.   Voting will remain
open until, let's say, mid-next week.

On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski  wrote:

> I would like to ask you to hold on with counting the votes until I get an
> answer for my one question in the dev mailing list thread (sorry if it was
> already covered somewhere).
>
> Best, Piotrek
>
> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
>
> > +1 (binding)
> >
> > Best,
> > Jark
> >
> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 03/06/2021 03:50, Zhou, Brian wrote:
> > > > +1 (non-binding)
> > > >
> > > > Thanks Eron, looking forward to seeing this feature soon.
> > > >
> > > > Thanks,
> > > > Brian
> > > >
> > > > -Original Message-
> > > > From: Arvid Heise 
> > > > Sent: Wednesday, June 2, 2021 15:44
> > > > To: dev
> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
> > > >
> > > >
> > > > [EXTERNAL EMAIL]
> > > >
> > > > +1 (binding)
> > > >
> > > > Thanks Eron for driving this effort; it will open up new exciting use
> > > cases.
> > > >
> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright  > > .invalid>
> > > > wrote:
> > > >
> > > >> After some good discussion about how to enhance the Sink API to
> > > >> process watermarks, I believe we're ready to proceed with a vote.
> > > >> Voting will be open until at least Friday, June 4th, 2021.
> > > >>
> > > >> Reference:
> > > >>
> > > >>
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
> > > >>
> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
> > > >> [cwiki[.]apache[.]org]
> > > >>
> > > >> Discussion thread:
> > > >>
> > > >>
> > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
> > > >>
> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
> > > >> ache.org
> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
> > > >>
> > > >> Implementation Issue:
> > > >>
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > >>
> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
> > > >> N6AJm4h$ [issues[.]apache[.]org]
> > > >>
> > > >> Thanks,
> > > >> Eron Wright
> > > >> StreamNative
> > > >>
> > >
> > >
> >
>


Re: Add control mode for flink

2021-06-04 Thread Peter Huang
I agree with Steven. This logic can be added in a dynamic config framework
that can bind into Flink operators. We probably don't need to let Flink
runtime handle it.

On Fri, Jun 4, 2021 at 8:11 AM Steven Wu  wrote:

> I am not sure if we should solve this problem in Flink. This is more like
> a dynamic config problem that probably should be solved by some
> configuration framework. Here is one post from google search:
> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>
> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:
>
>> Hi everyone,
>>
>>   Flink jobs are always long-running. When the job is running, users
>> may want to control the job but not stop it. The control reasons can be
>> different as following:
>>
>>1.
>>
>>Change data processing’ logic, such as filter condition.
>>2.
>>
>>Send trigger events to make the progress forward.
>>3.
>>
>>Define some tools to degrade the job, such as limit input qps,
>>sampling data.
>>4.
>>
>>Change log level to debug current problem.
>>
>>   The common way to do this is to stop the job, do modifications and
>> start the job. It may take a long time to recover. In some situations,
>> stopping jobs is intolerable, for example, the job is related to money or
>> important activities.So we need some technologies to control the running
>> job without stopping the job.
>>
>>
>> We propose to add control mode for flink. A control mode based on the
>> restful interface is first introduced. It works by these steps:
>>
>>
>>1. The user can predefine some logic which supports config control,
>>such as filter condition.
>>2. Run the job.
>>3. If the user wants to change the job's running logic, just send a
>>restful request with the responding config.
>>
>> Other control modes will also be considered in the future. More
>> introduction can refer to the doc
>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>> . If the community likes the proposal, more discussion is needed and a more
>> detailed design will be given later. Any suggestions and ideas are welcome.
>>
>>


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
Dawid, I think you're mischaracterizing the idleness signal as inherently a
heuristic, but Flink does not impose that.  A source-based watermark (and
corresponding idleness signal) may well be entirely data-driven, entirely
deterministic.  Basically you're underselling what the pipeline is capable
of, based on painful experiences with using the generic, heuristics-based
watermark assigner.  Please don't let those experiences overshadow what's
possible with source-based watermarking.

The idleness signal does have a strict definition, it indicates whether the
stream is actively participating in advancing the event time clock.  The
status of all participants is considered when aggregating watermarks.  A
source subtask generally makes the determination based on data, e.g.
whether a topic is assigned to that subtask.

We have here a modest proposal to add callbacks to the sink function for
information that the sink operator already receives.  The practical result
is improved correctness when used with streaming systems that have
first-class support for event time.  The specific changes may be previewed
here:
https://github.com/apache/flink/pull/15950
https://github.com/streamnative/flink/pull/2

Thank you all for the robust discussion. Do I have your support to proceed
to enhance FLIP-167 with idleness callbacks and to proceed to a vote?

Eron


On Fri, Jun 4, 2021 at 9:08 AM Arvid Heise  wrote:

> While everything I wrote before is still valid, upon further rethinking, I
> think that the conclusion is not necessarily correct:
> - If the user wants to have pipeline A and B behaving as if A+B was jointly
> executed in the same pipeline without the intermediate Pulsar topic, having
> the idleness in that topic is to only way to guarantee consistency.
> - We could support the following in the respective sources: If the user
> that wants to use a different definition of idleness in B, they can just
> provide a new idleness definition. At that point, we should discard the
> idleness in the intermediate topic while reading.
>
> If we would agree on the latter way, I think having the idleness in the
> topic is of great use because it's a piece of information that cannot be
> inferred as stated by others. Consequently, we would be able to support all
> use cases and can give the user the freedom to express his intent.
>
>
> On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise  wrote:
>
> > I think the core issue in this discussion is that we kind of assume that
> > idleness is something universally well-defined. But it's not. It's a
> > heuristic to advance data processing in event time where we would lack
> data
> > to do so otherwise.
> > Keep in mind that idleness has no real definition in terms of event time
> > and leads to severe unexpected results: If you reprocess a data stream
> with
> > temporarily idle partitions, these partitions would not be deemed idle on
> > reprocessing and there is a realistic chance that records that were
> deemed
> > late in the live processing case are now perfectly fine records in the
> > reprocessing case. (I can expand on that if that was too short)
> >
> > With that in mind, why would a downstream process even try to calculate
> > the same idleness state as the upstream process? I don't see a point; we
> > would just further any imprecision in the calculation.
> >
> > Let's have a concrete example. Assume that we have upstream pipeline A
> and
> > downstream pipeline B. A has plenty of resources and is live processing
> > data. Some partitions are idle and that is propagated to the sinks. Now B
> > is heavily backpressured and consumes very slowly. B doesn't see any
> > idleness directly. B can calculate exact watermarks and use all records
> for
> > it's calculation. Reprocessing would yield the same result for B. If we
> now
> > forward idleness, we can easily find cases where we would advance the
> > watermark prematurely while there is data directly available to calculate
> > the exact watermark.
> >
> > For me, idleness is just a pipeline-specific heuristic and should be
> > viewed as such.
> >
> > Best,
> >
> > Arvid
> >
> > On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski 
> > wrote:
> >
> >> Hi,
> >>
> >> > Imagine you're starting consuming from the result channel in a
> situation
> >> were you have:
> >> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2,
> >> record1, record0
> >> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might
> >> cause the record3 and record4 to be late depending on how the watermark
> >> progressed in other partitions.
> >>
> >> Yes, I understand this point. But it can also be the other way around.
> >> There might be a large gap between record2 and record3, and users might
> >> prefer or might be not able to duplicate idleness detection logic. The
> >> downstream system might be lacking some kind of information (that is
> only
> >> available in the top level/ingesting system) to correctly set the idle
> >> status.
> >>

[jira] [Created] (FLINK-22882) Tasks are blocked while emitting records

2021-06-04 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-22882:
--

 Summary: Tasks are blocked while emitting records
 Key: FLINK-22882
 URL: https://issues.apache.org/jira/browse/FLINK-22882
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network, Runtime / Task
Affects Versions: 1.14.0
Reporter: Piotr Nowojski


On a cluster I observed symptoms of tasks being blocked for long time, causing 
long delays with unaligned checkpointing. 99% of those cases were caused by 
`broadcastEmit` of the stream status

{noformat}
2021-06-04 14:41:44,049 ERROR 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool   [] - Blocking wait 
[11059 ms] for an available buffer.
java.lang.Exception: Stracktracegenerator
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:246)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:67)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.writeStreamStatus(RecordWriterOutput.java:136)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus.ensureActive(AnnouncedStatus.java:65)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 

[jira] [Created] (FLINK-22881) Tasks are blocked while emitting stream status

2021-06-04 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-22881:
--

 Summary: Tasks are blocked while emitting stream status
 Key: FLINK-22881
 URL: https://issues.apache.org/jira/browse/FLINK-22881
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network, Runtime / Task
Affects Versions: 1.14.0
Reporter: Piotr Nowojski


On a cluster I observed symptoms of tasks being blocked for long time, causing 
long delays with unaligned checkpointing. 99% of those cases were caused by 
`broadcastEmit` of the stream status
```
2021-06-04 14:41:44,049 ERROR 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool   [] - Blocking wait 
[11059 ms] for an available buffer.
java.lang.Exception: Stracktracegenerator
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:323)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:290)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:338)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:314)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:246)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:67)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.writeStreamStatus(RecordWriterOutput.java:136)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.streamstatus.AnnouncedStatus.ensureActive(AnnouncedStatus.java:65)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:82)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:182)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.api.connector.source.lib.util.IteratorSourceReader.pollNext(IteratorSourceReader.java:98)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
 ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Arvid Heise
While everything I wrote before is still valid, upon further rethinking, I
think that the conclusion is not necessarily correct:
- If the user wants to have pipeline A and B behaving as if A+B was jointly
executed in the same pipeline without the intermediate Pulsar topic, having
the idleness in that topic is to only way to guarantee consistency.
- We could support the following in the respective sources: If the user
that wants to use a different definition of idleness in B, they can just
provide a new idleness definition. At that point, we should discard the
idleness in the intermediate topic while reading.

If we would agree on the latter way, I think having the idleness in the
topic is of great use because it's a piece of information that cannot be
inferred as stated by others. Consequently, we would be able to support all
use cases and can give the user the freedom to express his intent.


On Fri, Jun 4, 2021 at 3:43 PM Arvid Heise  wrote:

> I think the core issue in this discussion is that we kind of assume that
> idleness is something universally well-defined. But it's not. It's a
> heuristic to advance data processing in event time where we would lack data
> to do so otherwise.
> Keep in mind that idleness has no real definition in terms of event time
> and leads to severe unexpected results: If you reprocess a data stream with
> temporarily idle partitions, these partitions would not be deemed idle on
> reprocessing and there is a realistic chance that records that were deemed
> late in the live processing case are now perfectly fine records in the
> reprocessing case. (I can expand on that if that was too short)
>
> With that in mind, why would a downstream process even try to calculate
> the same idleness state as the upstream process? I don't see a point; we
> would just further any imprecision in the calculation.
>
> Let's have a concrete example. Assume that we have upstream pipeline A and
> downstream pipeline B. A has plenty of resources and is live processing
> data. Some partitions are idle and that is propagated to the sinks. Now B
> is heavily backpressured and consumes very slowly. B doesn't see any
> idleness directly. B can calculate exact watermarks and use all records for
> it's calculation. Reprocessing would yield the same result for B. If we now
> forward idleness, we can easily find cases where we would advance the
> watermark prematurely while there is data directly available to calculate
> the exact watermark.
>
> For me, idleness is just a pipeline-specific heuristic and should be
> viewed as such.
>
> Best,
>
> Arvid
>
> On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> > Imagine you're starting consuming from the result channel in a situation
>> were you have:
>> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2,
>> record1, record0
>> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might
>> cause the record3 and record4 to be late depending on how the watermark
>> progressed in other partitions.
>>
>> Yes, I understand this point. But it can also be the other way around.
>> There might be a large gap between record2 and record3, and users might
>> prefer or might be not able to duplicate idleness detection logic. The
>> downstream system might be lacking some kind of information (that is only
>> available in the top level/ingesting system) to correctly set the idle
>> status.
>>
>> Piotrek
>>
>> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz 
>> napisał(a):
>>
>> >
>> > Same as Eron I don't follow this point. Any streaming sink can be used
>> as
>> > this kind of transient channel. Streaming sinks, like Kafka, are also
>> used
>> > to connect one streaming system with another one, also for an immediate
>> > consumption.
>> >
>> > Sure it can, but imo it is rarely the primary use case why you want to
>> > offload the channels to an external persistent system. Again in my
>> > understanding StreamStatus is something transient, e.g. part of our
>> > external system went offline. I think those kind of events should not be
>> > persisted.
>> >
>> > Both watermarks and idleness status can be some
>> > inherent property of the underlying data stream. if an
>> upstream/ingesting
>> > system knows that this particular stream/partition of a stream is going
>> > idle (for example for a couple of hours), why does this information
>> have to
>> > be re-created in the downstream system using some heuristic? It could be
>> > explicitly encoded.
>> >
>> > Because it's most certainly not true in the downstream. The idleness
>> works
>> > usually according to a heuristic: "We have not seen records for 5
>> minutes,
>> > so there is a fair chance we won't see records for the next 5 minutes,
>> so
>> > let's not wait for watermarks for now." That heuristic most certainly
>> won't
>> > hold for a downstream persistent storage.
>> >
>> > Imagine you're starting consuming from the result channel in a situation
>> > were you have:
>> >
>> > 

Re: Add control mode for flink

2021-06-04 Thread Steven Wu
I am not sure if we should solve this problem in Flink. This is more like a
dynamic config problem that probably should be solved by some configuration
framework. Here is one post from google search:
https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a

On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:

> Hi everyone,
>
>   Flink jobs are always long-running. When the job is running, users
> may want to control the job but not stop it. The control reasons can be
> different as following:
>
>1.
>
>Change data processing’ logic, such as filter condition.
>2.
>
>Send trigger events to make the progress forward.
>3.
>
>Define some tools to degrade the job, such as limit input qps,
>sampling data.
>4.
>
>Change log level to debug current problem.
>
>   The common way to do this is to stop the job, do modifications and
> start the job. It may take a long time to recover. In some situations,
> stopping jobs is intolerable, for example, the job is related to money or
> important activities.So we need some technologies to control the running
> job without stopping the job.
>
>
> We propose to add control mode for flink. A control mode based on the
> restful interface is first introduced. It works by these steps:
>
>
>1. The user can predefine some logic which supports config control,
>such as filter condition.
>2. Run the job.
>3. If the user wants to change the job's running logic, just send a
>restful request with the responding config.
>
> Other control modes will also be considered in the future. More
> introduction can refer to the doc
> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
> . If the community likes the proposal, more discussion is needed and a more
> detailed design will be given later. Any suggestions and ideas are welcome.
>
>


Add control mode for flink

2021-06-04 Thread 刘建刚
Hi everyone,

  Flink jobs are always long-running. When the job is running, users
may want to control the job but not stop it. The control reasons can be
different as following:

   1.

   Change data processing’ logic, such as filter condition.
   2.

   Send trigger events to make the progress forward.
   3.

   Define some tools to degrade the job, such as limit input qps, sampling
   data.
   4.

   Change log level to debug current problem.

  The common way to do this is to stop the job, do modifications and
start the job. It may take a long time to recover. In some situations,
stopping jobs is intolerable, for example, the job is related to money or
important activities.So we need some technologies to control the running
job without stopping the job.


We propose to add control mode for flink. A control mode based on the
restful interface is first introduced. It works by these steps:


   1. The user can predefine some logic which supports config control, such
   as filter condition.
   2. Run the job.
   3. If the user wants to change the job's running logic, just send a
   restful request with the responding config.

Other control modes will also be considered in the future. More
introduction can refer to the doc
https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
. If the community likes the proposal, more discussion is needed and a more
detailed design will be given later. Any suggestions and ideas are welcome.


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Arvid Heise
I think the core issue in this discussion is that we kind of assume that
idleness is something universally well-defined. But it's not. It's a
heuristic to advance data processing in event time where we would lack data
to do so otherwise.
Keep in mind that idleness has no real definition in terms of event time
and leads to severe unexpected results: If you reprocess a data stream with
temporarily idle partitions, these partitions would not be deemed idle on
reprocessing and there is a realistic chance that records that were deemed
late in the live processing case are now perfectly fine records in the
reprocessing case. (I can expand on that if that was too short)

With that in mind, why would a downstream process even try to calculate the
same idleness state as the upstream process? I don't see a point; we would
just further any imprecision in the calculation.

Let's have a concrete example. Assume that we have upstream pipeline A and
downstream pipeline B. A has plenty of resources and is live processing
data. Some partitions are idle and that is propagated to the sinks. Now B
is heavily backpressured and consumes very slowly. B doesn't see any
idleness directly. B can calculate exact watermarks and use all records for
it's calculation. Reprocessing would yield the same result for B. If we now
forward idleness, we can easily find cases where we would advance the
watermark prematurely while there is data directly available to calculate
the exact watermark.

For me, idleness is just a pipeline-specific heuristic and should be viewed
as such.

Best,

Arvid

On Fri, Jun 4, 2021 at 2:01 PM Piotr Nowojski  wrote:

> Hi,
>
> > Imagine you're starting consuming from the result channel in a situation
> were you have:
> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2,
> record1, record0
> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might
> cause the record3 and record4 to be late depending on how the watermark
> progressed in other partitions.
>
> Yes, I understand this point. But it can also be the other way around.
> There might be a large gap between record2 and record3, and users might
> prefer or might be not able to duplicate idleness detection logic. The
> downstream system might be lacking some kind of information (that is only
> available in the top level/ingesting system) to correctly set the idle
> status.
>
> Piotrek
>
> pt., 4 cze 2021 o 12:30 Dawid Wysakowicz 
> napisał(a):
>
> >
> > Same as Eron I don't follow this point. Any streaming sink can be used as
> > this kind of transient channel. Streaming sinks, like Kafka, are also
> used
> > to connect one streaming system with another one, also for an immediate
> > consumption.
> >
> > Sure it can, but imo it is rarely the primary use case why you want to
> > offload the channels to an external persistent system. Again in my
> > understanding StreamStatus is something transient, e.g. part of our
> > external system went offline. I think those kind of events should not be
> > persisted.
> >
> > Both watermarks and idleness status can be some
> > inherent property of the underlying data stream. if an upstream/ingesting
> > system knows that this particular stream/partition of a stream is going
> > idle (for example for a couple of hours), why does this information have
> to
> > be re-created in the downstream system using some heuristic? It could be
> > explicitly encoded.
> >
> > Because it's most certainly not true in the downstream. The idleness
> works
> > usually according to a heuristic: "We have not seen records for 5
> minutes,
> > so there is a fair chance we won't see records for the next 5 minutes, so
> > let's not wait for watermarks for now." That heuristic most certainly
> won't
> > hold for a downstream persistent storage.
> >
> > Imagine you're starting consuming from the result channel in a situation
> > were you have:
> >
> > record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2,
> record1,
> > record0
> >
> > Switching to the encoded StreamStatus.IDLE is unnecessary, and might
> cause
> > the record3 and record4 to be late depending on how the watermark
> > progressed in other partitions.
> >
> > I understand Eron's use case, which is not about storing the
> StreamStatus,
> > but performing an immediate aggregation or said differently changing the
> > partitioning/granularity of records and watermarks externally to Flink.
> The
> > produced by Flink partitioning is actually never persisted in that case.
> In
> > this case I agree exposing the StreamStatus makes sense. I am still
> > concerned it will lead to storing the StreamStatus which can lead to many
> > subtle problems.
> > On 04/06/2021 11:53, Piotr Nowojski wrote:
> >
> > Hi,
> >
> > Thanks for picking up this discussion. For the record, I also think we
> > shouldn't expose latency markers.
> >
> > About the stream status
> >
> >
> >  Persisting the StreamStatus
> >
> > I don't agree with the view that sinks are "storing" the 

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-04 Thread Till Rohrmann
As I've said I am not a security expert and that's why I have to ask for
clarification, Gabor. You are saying that if we configure a truststore for
the REST endpoint with a single trusted certificate which has been
generated by the operator of the Flink cluster, then the attacker can
generate a new certificate, sign it and then talk to the Flink cluster if
he has access to the node on which the REST endpoint runs? My understanding
was that you need the corresponding private key which in my proposed setup
would be under the control of the operator as well (e.g. stored in a
keystore on the same machine but guarded by some secret). That way (if I am
not mistaken), only the entity which has access to the keystore is able to
talk to the Flink cluster.

Maybe we are also getting our wires crossed here and are talking about
different things.

Thanks for listing the pros and cons of Kerberos. Concerning what other
authentication mechanisms are used in the industry, I am not 100% sure.

Cheers,
Till

On Fri, Jun 4, 2021 at 11:09 AM Gabor Somogyi 
wrote:

> > I did not mean for the user to sign its own certificates but for the
> operator of the cluster. Once the user request hits the proxy, it should no
> longer be under his control. I think I do not fully understand yet why this
> would not work.
> I said it's not solving the authentication problem over any proxy. Even if
> the operator is signing the certificate one can have access to an internal
> node.
> Such case anybody can craft certificates which is accepted by the server.
> When it's accepted a bad guy can cancel jobs causing huge impacts.
>
> > Also, I am missing a bit the comparison of Kerberos to other
> authentication mechanisms and why they were rejected in favour of Kerberos.
> PROS:
> * Since it's not depending on cloud provider and/or k8s or bare-metal etc.
> deployment it's the biggest plus
> * Centralized with tools and no need to write tons of tools around
> * There are clients/tools on almost all OS-es and several languages
> * Super huge users are using it for years in production w/o huge issues
> * Provides cross-realm trust possibility amongst other features
> * Several open source components using it which could increase
> compatibility
>
> CONS:
> * Not everybody using kerberos
> * It would increase the code footprint but this is true for many features
> (as a side note I'm here to maintain it)
>
> Feel free to add your points because it only represents a single viewpoint.
> Also if you have any better option for strong authentication please share
> it and we can consider the pros/cons here.
>
> BR,
> G
>
>
> On Fri, Jun 4, 2021 at 10:32 AM Till Rohrmann 
> wrote:
>
>> I did not mean for the user to sign its own certificates but for the
>> operator of the cluster. Once the user request hits the proxy, it should no
>> longer be under his control. I think I do not fully understand yet why this
>> would not work.
>>
>> What I would like to avoid is to add more complexity into Flink if there
>> is an easy solution which fulfills the requirements. That's why I would
>> like to exercise thoroughly through the different alternatives. Also, I am
>> missing a bit the comparison of Kerberos to other authentication mechanisms
>> and why they were rejected in favour of Kerberos.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> I think there might be possible alternatives but it seems Kerberos on
>>> the rest endpoint ticks all the right boxes and provides a super clean and
>>> simple solution for strong authentication.
>>>
>>> I wouldn’t even consider sidecar proxies etc if we can solve it in such
>>> a simple way as proposed by G.
>>>
>>> Cheers
>>> Gyula
>>>
>>> On Fri, 4 Jun 2021 at 10:03, Till Rohrmann  wrote:
>>>
 I am not saying that we shouldn't add a strong authentication mechanism
 if there are good reasons for it. I primarily would like to understand the
 context a bit better in order to give qualified feedback and come to a good
 decision. In order to do this, I have the feeling that we haven't fully
 considered all available options which are on the table, tbh.

 Does the problem of certificate expiry also apply for self-signed
 certificates? If yes, then this should then also be a problem for the
 internal encryption of Flink's communication. If not, then one could use
 self-signed certificates with a longer validity to solve the mentioned
 issue.

 I think you can set up Flink in such a way that you don't have to
 handle all the different certificates. For example, you could deploy Flink
 with a "sidecar proxy" which is responsible for the authentication using an
 arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local
 network interface. That way, the REST endpoint would only be available
 through the sidecar proxy. Additionally, one could enable SSL for this
 communication. Would this be a 

[jira] [Created] (FLINK-22880) Remove "blink" term in code base

2021-06-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-22880:


 Summary: Remove "blink" term in code base
 Key: FLINK-22880
 URL: https://issues.apache.org/jira/browse/FLINK-22880
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Apart from FLINK-22879 and some API parts (such as EnvironmentSettings and old 
SQL Client YAML), we should not use the term "blink" in the code base and 
documentation anymore. For giving some background information, we should only 
document that the current planner was called "blink planner" in the past.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22879) Remove "blink" suffix from table modules

2021-06-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-22879:


 Summary: Remove "blink" suffix from table modules
 Key: FLINK-22879
 URL: https://issues.apache.org/jira/browse/FLINK-22879
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Timo Walther


In order to reduce confusion around "What is Flink?/What is Blink?" we should 
remove the term {{blink}} from the following modules:

{code}
flink-table-planner-blink
flink-table-runtime-blink
flink-table-uber-blink
{code}

It is up for discussion if we will:
- just perform the refactoring,
- keep the old modules for a while,
- or move the contents to new modules and link to those from the old modules

In any case we should make sure to keep the Git history.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Piotr Nowojski
Hi,

> Imagine you're starting consuming from the result channel in a situation
were you have:
> record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2,
record1, record0
> Switching to the encoded StreamStatus.IDLE is unnecessary, and might
cause the record3 and record4 to be late depending on how the watermark
progressed in other partitions.

Yes, I understand this point. But it can also be the other way around.
There might be a large gap between record2 and record3, and users might
prefer or might be not able to duplicate idleness detection logic. The
downstream system might be lacking some kind of information (that is only
available in the top level/ingesting system) to correctly set the idle
status.

Piotrek

pt., 4 cze 2021 o 12:30 Dawid Wysakowicz 
napisał(a):

>
> Same as Eron I don't follow this point. Any streaming sink can be used as
> this kind of transient channel. Streaming sinks, like Kafka, are also used
> to connect one streaming system with another one, also for an immediate
> consumption.
>
> Sure it can, but imo it is rarely the primary use case why you want to
> offload the channels to an external persistent system. Again in my
> understanding StreamStatus is something transient, e.g. part of our
> external system went offline. I think those kind of events should not be
> persisted.
>
> Both watermarks and idleness status can be some
> inherent property of the underlying data stream. if an upstream/ingesting
> system knows that this particular stream/partition of a stream is going
> idle (for example for a couple of hours), why does this information have to
> be re-created in the downstream system using some heuristic? It could be
> explicitly encoded.
>
> Because it's most certainly not true in the downstream. The idleness works
> usually according to a heuristic: "We have not seen records for 5 minutes,
> so there is a fair chance we won't see records for the next 5 minutes, so
> let's not wait for watermarks for now." That heuristic most certainly won't
> hold for a downstream persistent storage.
>
> Imagine you're starting consuming from the result channel in a situation
> were you have:
>
> record4, record3, StreamStatus.ACTIVE, StreamStatus.IDLE record2, record1,
> record0
>
> Switching to the encoded StreamStatus.IDLE is unnecessary, and might cause
> the record3 and record4 to be late depending on how the watermark
> progressed in other partitions.
>
> I understand Eron's use case, which is not about storing the StreamStatus,
> but performing an immediate aggregation or said differently changing the
> partitioning/granularity of records and watermarks externally to Flink. The
> produced by Flink partitioning is actually never persisted in that case. In
> this case I agree exposing the StreamStatus makes sense. I am still
> concerned it will lead to storing the StreamStatus which can lead to many
> subtle problems.
> On 04/06/2021 11:53, Piotr Nowojski wrote:
>
> Hi,
>
> Thanks for picking up this discussion. For the record, I also think we
> shouldn't expose latency markers.
>
> About the stream status
>
>
>  Persisting the StreamStatus
>
> I don't agree with the view that sinks are "storing" the data/idleness
> status. This nomenclature makes only sense if we are talking about
> streaming jobs producing batch data.
>
>
> In my understanding a StreamStatus makes sense only when talking about
> immediately consumed transient channels such as between operators within
> a single job.
>
> Same as Eron I don't follow this point. Any streaming sink can be used as
> this kind of transient channel. Streaming sinks, like Kafka, are also used
> to connect one streaming system with another one, also for an immediate
> consumption.
>
> You could say the same thing about watermarks (note they are usually
> generated in Flink based on the incoming events) and I would not agree with
> it in the same way. Both watermarks and idleness status can be some
> inherent property of the underlying data stream. if an upstream/ingesting
> system knows that this particular stream/partition of a stream is going
> idle (for example for a couple of hours), why does this information have to
> be re-created in the downstream system using some heuristic? It could be
> explicitly encoded.  If you want to pass watermarks explicitly to a next
> downstream streaming system, because you do not want to recreate them from
> the events using a duplicated logic, why wouldn't you like to do the same
> thing with the idleness?
>
> Also keep in mind that I would expect that a user can decide whether he
> wants to persist the watermarks/stream status on his own. This shouldn't be
> obligatory.
>
> For me there is one good reason to not expose stream status YET. That is,
> if we are sure that we do not need this just yet, while at the same time we
> don't want to expand the Public/PublicEvolving API, as this always
> increases the maintenance cost.
>
> Best,
> Piotrek
>
>
> pt., 4 cze 2021 o 10:57 Eron Wright  
> 

[jira] [Created] (FLINK-22878) Allow placeholder options in format factories

2021-06-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-22878:


 Summary: Allow placeholder options in format factories
 Key: FLINK-22878
 URL: https://issues.apache.org/jira/browse/FLINK-22878
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther


The current factory design does not allow placeholder options in 
{{EncodingFormatFactory}} or {{DecodingFormatFactory}}.

The past has shown that placeholder options are used at a couple of locations.
See FLINK-22475 or {{KafkaOptions#PROPERTIES_PREFIX}}.

We should think about adding an additional functionality to {{ReadableConfig}} 
or a special {{ConfigOption}} type to finally solve this problem. This could 
also be useful for FLIP-129. And would solve the [current shortcomings for 
Confluent Avro 
registry|https://github.com/apache/flink/pull/15808#discussion_r645494282].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22877) Remove BatchTableEnvironment and related API classes

2021-06-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-22877:


 Summary: Remove BatchTableEnvironment and related API classes
 Key: FLINK-22877
 URL: https://issues.apache.org/jira/browse/FLINK-22877
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Remove BatchTableEnvironment and other DataSet related API classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] Planning Flink 1.14

2021-06-04 Thread JING ZHANG
Hi all,

@Xintong Song
Thanks for reminding me, I would contact Jark to update the wiki page.

Besides, I'd like to provide more inputs by sharing our experience about
upgrading Internal version of Flink.

Flink has been widely used in the production environment since 2018 in our
company. Our internal version is far behind the latest stable version of
the community by about 1 year. We upgraded the internal Flink version to
1.10 version in March last year, and we plan to upgrade directly to 1.13
next month (missed 1.11 and 1.12 versions). We wish to use the latest
version as soon as possible. However, in fact we follow up with the
community's latest stable release version almost once a year because
upgrading to a new version is a time-consuming process.

I list detailed works as follows.

a. Before release new internal version
1) Required: Cherrypick internal features to the new Flink branch. A few
features need to be redeveloped based on the new branch code base.
BTW, The cost would be more and more heavy since we maintain more and
more internal features in our internal version.
2) Optional: Some internal connectors need to adapt to the new API
3) Required: Surrounding products need to updated based on the new API, for
example, Internal Flink SQL WEB development platform
4) Required: Regression tests

b. After release, encourage users to upgrade existing jobs (Thousands of
jobs) to the new version, User need some time to do :
1) Repackage jar for dataStream job
2) For critical jobs, users need to run jobs at the two versions at the
same time for a while. Migrated to a new job only after comparing the
data carefully.
3) Pure ETL SQL jobs are easy to bump up. But other Flink SQL jobs with
stateful operators need extra efforts because Flink SQL Job does not
support state compatibility yet.

Best regards,
JING ZHANG

Prasanna kumar  于2021年6月4日周五 下午2:27写道:

> Hi all,
>
> We are using Flink for our eventing system. Overall we are very happy with
> the tech, documentation and community support and quick replies in mails.
>
> My last 1 year experience with versions.
>
> We were working on 1.10 initially during our research phase then we
> stabilised with 1.11 as we moved on but by the time we are about to get
> into production 1.12 was released. As with all software and products,
> there were bugs reported. So we waited till 1.12.2 was released and then
> upgraded. Within a month of us doing it 1.13 got released.
>
> But by past experience , we waited till at least a couple of minor
> versions(fixing bugs) get released before we move onto a newer version.
> The development happens at a rapid/good phase in flink (which is good in
> terms of features) but adoption and moving the production code to newer
> version 3/4 times a year is an onerous effort. For example , the memory
> model was changed in one of the releases (there is a good documentation) .
> But as a production user to adopt the newer version, at least a month of
> testing is required with a huge scale environment. We also do not want to
> be behind more than 2 versions at any point of time.
>
> I Personally feel 2 major releases a year or at max a release once 5 months
> is good.
>
> Thanks
> Prasanna.
>
> On Fri, Jun 4, 2021 at 9:38 AM Xintong Song  wrote:
>
> > Thanks everyone for the feedback.
> >
> > @Jing,
> > Thanks for the inputs. Could you please ask a committer who works
> together
> > with you on these items to fill them into the feature collecting wiki
> page
> > [1]? I assume Jark, who co-edited the flip wiki page, is working with
> you?
> >
> > @Kurt, @Till and @Seth,
> > First of all, a few things that potentially demotivate users from
> > upgrading, observed from users that I've been in touch with.
> > 1. It takes time for Flink major releases to get stabilized. Many users
> > tend to waitting for the bugfix releases (x.y.1/2, or even x.y.3/4)
> rather
> > than upgrading to x.y.0 immediately. This could take months, sometimes
> even
> > after the next major release.
> > 2. Many users maintain an internal version of Flink, with customized
> > features for their specific businesses. For them, upgrading Flink
> requires
> > significant efforts to rebase those customized features. On the other
> hand,
> > the more versions they are left behind, the harder to contribute those
> > features to the community, becoming a vicious cycle.
> >
> > I think the question to be answered is how do we prioritize between
> > stabilizing a previous major release and casting a new major release. So
> > far, it feels like the new release is prior. I recall that we have waited
> > for weeks to release 1.11.3 because people were busy stabilizing 1.12.0.
> > What if more resources are lean to the bugfix releases? We may have a
> more
> > explicit schedule for the bugfix releases. E.g., try to always release
> the
> > first bugfix release 2 weeks after the major release, the second bugfix
> > release 4 weeks after that, and release on-demand starting from the 

[jira] [Created] (FLINK-22876) Adding SharedObjects junit rule to ease test development

2021-06-04 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-22876:
---

 Summary: Adding SharedObjects junit rule to ease test development
 Key: FLINK-22876
 URL: https://issues.apache.org/jira/browse/FLINK-22876
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Affects Versions: 1.14.0
Reporter: Arvid Heise
Assignee: Arvid Heise
 Fix For: 1.14.0


Most test rely on static variables to sync between test code and UDFs to avoid 
serialization issues. However, static variables are error-prone and the tests 
look quaint.
SharedObjects allow test developers to forget about the serialization and just 
use the objects across thread boundaries.

The main idea is that shared objects are bound to the scope of a test case 
instead of a class. That allows us to:
* get rid of all nasty reset methods for reused parts and most importantly 
avoid test bugs that result in us forgetting to reset (think of a latch in a 
shared WaitingSink)
* it’s easier to reason about the test setup
* it will allow us to share more code and provide more primitives (e.g. have 
some canonical way to wait for certain events)
* run tests in parallel that reuse classes depending on shared objects
* it will also be significantly easier to write tests for contributors. all 
student groups had issues with this synchronization of test code and UDFs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22875) Cannot work by Pickler and Unpickler

2021-06-04 Thread JYXL (Jira)
JYXL created FLINK-22875:


 Summary: Cannot work by Pickler and Unpickler
 Key: FLINK-22875
 URL: https://issues.apache.org/jira/browse/FLINK-22875
 Project: Flink
  Issue Type: Bug
Reporter: JYXL






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Piotr Nowojski
Hi,

Thanks for picking up this discussion. For the record, I also think we
shouldn't expose latency markers.

About the stream status

>  Persisting the StreamStatus

I don't agree with the view that sinks are "storing" the data/idleness
status. This nomenclature makes only sense if we are talking about
streaming jobs producing batch data.

> In my understanding a StreamStatus makes sense only when talking about
> immediately consumed transient channels such as between operators within
> a single job.

Same as Eron I don't follow this point. Any streaming sink can be used as
this kind of transient channel. Streaming sinks, like Kafka, are also used
to connect one streaming system with another one, also for an immediate
consumption.

You could say the same thing about watermarks (note they are usually
generated in Flink based on the incoming events) and I would not agree with
it in the same way. Both watermarks and idleness status can be some
inherent property of the underlying data stream. if an upstream/ingesting
system knows that this particular stream/partition of a stream is going
idle (for example for a couple of hours), why does this information have to
be re-created in the downstream system using some heuristic? It could be
explicitly encoded.  If you want to pass watermarks explicitly to a next
downstream streaming system, because you do not want to recreate them from
the events using a duplicated logic, why wouldn't you like to do the same
thing with the idleness?

Also keep in mind that I would expect that a user can decide whether he
wants to persist the watermarks/stream status on his own. This shouldn't be
obligatory.

For me there is one good reason to not expose stream status YET. That is,
if we are sure that we do not need this just yet, while at the same time we
don't want to expand the Public/PublicEvolving API, as this always
increases the maintenance cost.

Best,
Piotrek


pt., 4 cze 2021 o 10:57 Eron Wright 
napisał(a):

> I believe that the correctness of watermarks and stream status markers is
> determined entirely by the source (ignoring the generic assigner).  Such
> stream elements are known not to overtake records, and aren't transient
> from a pipeline perspective.  I do agree that recoveries may be lossy if
> some operator state is transient (e.g. valve state).
>
> Consider that status markers already affect the flow of watermarks (e.g.
> suppression), and thus affect operator behavior.  Seems to me that exposing
> the idleness state is no different than exposing a watermark.
>
> The high-level story is, there is a need for the Flink job to be
> transparent or neutral with respect to the event time clock.  I believe
> this is possible if time flows with high fidelity from source to sink.  Of
> course, one always has the choice as to whether to use source-based
> watermarks; as you mentioned, requirements vary.
>
> Regarding the Pulsar specifics, we're working on a community proposal that
> I'm anxious to share.  To answer your question, the broker aggregates
> watermarks from multiple producers who are writing to a single topic.
> Each sink
> subtask is a producer.  The broker considers each producer's assertions
> (watermarks, idleness) to be independent inputs, much like the case with
> the watermark valve.
>
> On your concern about idleness causing false late events, I understand your
> point but don't think it applies if the keyspace assignments are stable.
>
> I hope this explains to your satisfaction.
>
> - Eron
>
>
>
>
>
> On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz 
> wrote:
>
> > Hi Eron,
> >
> > I might be missing some background on Pulsar partitioning but something
> > seems off to me. What is the chunk/batch/partition that Pulsar brokers
> > will additionally combine watermarks for? Isn't it the case that only a
> > single Flink sub-task would write to such a chunk and thus will produce
> > an aggregated watermark already via the writeWatermark method?
> >
> > Personally I am really skeptical about exposing the StreamStatus in any
> > Producer API. In my understanding the StreamStatus is a transient
> > setting of a consumer of data. StreamStatus is a mechanism for making a
> > tradeoff between correctness (how many late elements that are behind
> > watermark we have) vs making progress. IMO one has to be extra cautious
> > when it comes to persistent systems. Again I might be missing the exact
> > use case you are trying to solve here, but I can imagine multiple jobs
> > reading from such a stream which might have different correctness
> > requirements. Just quickly throwing an idea out of my head you might
> > want to have an entirely correct results which can be delayed for
> > minutes, and a separate task that produces quick insights within
> > seconds. Another thing to consider is that by the time the downstream
> > job starts consuming the upstream one might have produced records to the
> > previously idle chunk. Persisting the StreamStatus in such a 

[jira] [Created] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

2021-06-04 Thread Spongebob (Jira)
Spongebob created FLINK-22874:
-

 Summary: flink table partition trigger doesn't effect as 
expectation when sink into hive table
 Key: FLINK-22874
 URL: https://issues.apache.org/jira/browse/FLINK-22874
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: Spongebob


I am trying to sink into hive partitioned table which partition commit trigger 
is declared as "

partition-time", and I had assigned watermark on the dataStream. When I input 
some data into dataStream it can not commit hive partition on time. Here's my 
code
{code:java}
//ddl of hive table 
create table test_table(username string)
partitioned by (ts bigint)
stored as orc
TBLPROPERTIES (
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);{code}
{code:java}
// flink application code

val streamEnv = ...
val dataStream:DataStream[(String, Long)] = ...


// assign watermark and output watermark info in processFunction
class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, 
Long)] {
  override def processElement(value: (String, Long), ctx: 
ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: 
Collector[(String, Long, Long)]): Unit = {
out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
  }
}

val resultStream = dataStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
  .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(element: (String, Long), recordTimestamp: 
Long): Long = {
  element._2 * 1000
}
  }))
.process(new MyProcessFunction)

//
val streamTableEnv = buildStreamTableEnv(streamEnv, 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())

// convert dataStream into hive catalog table and sink into hive
streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
val catalog = ...
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test_table select _1,_2 from 
default_catalog.default_database.test_catalog_t").print()


// flink use the default parallelism 4
// input data
(a, 1)
(b, 2)
(c, 3)
(d, 4)
(a, 5)
 ...

// result
there are much partition directories on hdfs but all they are inprogressing 
files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-04 Thread Gabor Somogyi
> I did not mean for the user to sign its own certificates but for the
operator of the cluster. Once the user request hits the proxy, it should no
longer be under his control. I think I do not fully understand yet why this
would not work.
I said it's not solving the authentication problem over any proxy. Even if
the operator is signing the certificate one can have access to an internal
node.
Such case anybody can craft certificates which is accepted by the server.
When it's accepted a bad guy can cancel jobs causing huge impacts.

> Also, I am missing a bit the comparison of Kerberos to other
authentication mechanisms and why they were rejected in favour of Kerberos.
PROS:
* Since it's not depending on cloud provider and/or k8s or bare-metal etc.
deployment it's the biggest plus
* Centralized with tools and no need to write tons of tools around
* There are clients/tools on almost all OS-es and several languages
* Super huge users are using it for years in production w/o huge issues
* Provides cross-realm trust possibility amongst other features
* Several open source components using it which could increase compatibility

CONS:
* Not everybody using kerberos
* It would increase the code footprint but this is true for many features
(as a side note I'm here to maintain it)

Feel free to add your points because it only represents a single viewpoint.
Also if you have any better option for strong authentication please share
it and we can consider the pros/cons here.

BR,
G


On Fri, Jun 4, 2021 at 10:32 AM Till Rohrmann  wrote:

> I did not mean for the user to sign its own certificates but for the
> operator of the cluster. Once the user request hits the proxy, it should no
> longer be under his control. I think I do not fully understand yet why this
> would not work.
>
> What I would like to avoid is to add more complexity into Flink if there
> is an easy solution which fulfills the requirements. That's why I would
> like to exercise thoroughly through the different alternatives. Also, I am
> missing a bit the comparison of Kerberos to other authentication mechanisms
> and why they were rejected in favour of Kerberos.
>
> Cheers,
> Till
>
> On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra  wrote:
>
>> Hi!
>>
>> I think there might be possible alternatives but it seems Kerberos on the
>> rest endpoint ticks all the right boxes and provides a super clean and
>> simple solution for strong authentication.
>>
>> I wouldn’t even consider sidecar proxies etc if we can solve it in such a
>> simple way as proposed by G.
>>
>> Cheers
>> Gyula
>>
>> On Fri, 4 Jun 2021 at 10:03, Till Rohrmann  wrote:
>>
>>> I am not saying that we shouldn't add a strong authentication mechanism
>>> if there are good reasons for it. I primarily would like to understand the
>>> context a bit better in order to give qualified feedback and come to a good
>>> decision. In order to do this, I have the feeling that we haven't fully
>>> considered all available options which are on the table, tbh.
>>>
>>> Does the problem of certificate expiry also apply for self-signed
>>> certificates? If yes, then this should then also be a problem for the
>>> internal encryption of Flink's communication. If not, then one could use
>>> self-signed certificates with a longer validity to solve the mentioned
>>> issue.
>>>
>>> I think you can set up Flink in such a way that you don't have to handle
>>> all the different certificates. For example, you could deploy Flink with a
>>> "sidecar proxy" which is responsible for the authentication using an
>>> arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local
>>> network interface. That way, the REST endpoint would only be available
>>> through the sidecar proxy. Additionally, one could enable SSL for this
>>> communication. Would this be a solution for the problem?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi 
>>> wrote:
>>>
 That is an interesting idea, Till.

 The main issue with it is that TLS certificates have an expiration
 time, usually they get approved for a couple years. Forcing our users to
 restart jobs to reprovision TLS certificates would be weird when we could
 just implement a single proper strong authentication mechanism instead in a
 couple hundred lines of code. :-)

 In many cases it is also impractical to go the TLS mutual route,
 because the Flink Dashboard can end up on any node in the k8s/Yarn cluster
 which means that we need a certificate per node (due to the mutual auth),
 but if we also want to protect the private key of these from users
 accidentally or intentionally leaking them then we need this per user. As
 in we end up managing user*machine number certificates and having to renew
 them periodically, which albeit automatable is unfortunately not yet
 automated in all large organizations.

 I fully agree that TLS certificate mutual authentication has its nice
 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Eron Wright
I believe that the correctness of watermarks and stream status markers is
determined entirely by the source (ignoring the generic assigner).  Such
stream elements are known not to overtake records, and aren't transient
from a pipeline perspective.  I do agree that recoveries may be lossy if
some operator state is transient (e.g. valve state).

Consider that status markers already affect the flow of watermarks (e.g.
suppression), and thus affect operator behavior.  Seems to me that exposing
the idleness state is no different than exposing a watermark.

The high-level story is, there is a need for the Flink job to be
transparent or neutral with respect to the event time clock.  I believe
this is possible if time flows with high fidelity from source to sink.  Of
course, one always has the choice as to whether to use source-based
watermarks; as you mentioned, requirements vary.

Regarding the Pulsar specifics, we're working on a community proposal that
I'm anxious to share.  To answer your question, the broker aggregates
watermarks from multiple producers who are writing to a single topic.
Each sink
subtask is a producer.  The broker considers each producer's assertions
(watermarks, idleness) to be independent inputs, much like the case with
the watermark valve.

On your concern about idleness causing false late events, I understand your
point but don't think it applies if the keyspace assignments are stable.

I hope this explains to your satisfaction.

- Eron





On Fri, Jun 4, 2021, 12:07 AM Dawid Wysakowicz 
wrote:

> Hi Eron,
>
> I might be missing some background on Pulsar partitioning but something
> seems off to me. What is the chunk/batch/partition that Pulsar brokers
> will additionally combine watermarks for? Isn't it the case that only a
> single Flink sub-task would write to such a chunk and thus will produce
> an aggregated watermark already via the writeWatermark method?
>
> Personally I am really skeptical about exposing the StreamStatus in any
> Producer API. In my understanding the StreamStatus is a transient
> setting of a consumer of data. StreamStatus is a mechanism for making a
> tradeoff between correctness (how many late elements that are behind
> watermark we have) vs making progress. IMO one has to be extra cautious
> when it comes to persistent systems. Again I might be missing the exact
> use case you are trying to solve here, but I can imagine multiple jobs
> reading from such a stream which might have different correctness
> requirements. Just quickly throwing an idea out of my head you might
> want to have an entirely correct results which can be delayed for
> minutes, and a separate task that produces quick insights within
> seconds. Another thing to consider is that by the time the downstream
> job starts consuming the upstream one might have produced records to the
> previously idle chunk. Persisting the StreamStatus in such a scenario
> would add unnecessary false late events.
>
> In my understanding a StreamStatus makes sense only when talking about
> immediately consumed transient channels such as between operators within
> a single job.
>
> Best,
>
> Dawid
>
> On 03/06/2021 23:31, Eron Wright wrote:
> > I think the rationale for end-to-end idleness (i.e. between pipelines) is
> > the same as the rationale for idleness between operators within a
> > pipeline.   On the 'main issue' you mentioned, we entrust the source with
> > adapting to Flink's notion of idleness (e.g. in Pulsar source, it means
> > that no topics/partitions are assigned to a given sub-task); a similar
> > adaption would occur in the sink.  In other words, I think it reasonable
> > that a sink for a watermark-aware storage system has need for the
> idleness
> > signal.
> >
> > Let me explain how I would use it in Pulsar's sink.  Each sub-task is a
> > Pulsar producer, and is writing watermarks to a configured topic via the
> > Producer API.  The Pulsar broker aggregates the watermarks that are
> written
> > by each producer into a global minimum (similar to StatusWatermarkValve).
> > The broker keeps track of which producers are actively producing
> > watermarks, and a producer may mark itself as idle to tell the broker not
> > to wait for watermarks from it, e.g. when a producer is going offline.  I
> > had intended to mark the producer as idle when the sub-task is closing,
> but
> > now I see that it would be insufficient; the producer should also be
> idled
> > if the sub-task is idled.  Otherwise, the broker would wait indefinitely
> > for the idled sub-task to produce a watermark.
> >
> > Arvid, I think your original instincts were correct about idleness
> > propagation, and I hope I've demonstrated a practical use case.
> >
> >
> >
> > On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise  wrote:
> >
> >> When I was rethinking the idleness issue, I came to the conclusion that
> it
> >> should be inferred at the source of the respective downstream pipeline
> >> again.
> >>
> >> The main issue on propagating 

[jira] [Created] (FLINK-22873) Add ToC to configuration documentation

2021-06-04 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-22873:


 Summary: Add ToC to configuration documentation
 Key: FLINK-22873
 URL: https://issues.apache.org/jira/browse/FLINK-22873
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.1
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.13.2






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-04 Thread Yun Gao
Hi all,

Very thanks @Dawid for resuming the discussion and very thanks @Till for the 
summary ! (and very sorry for I missed the mail and do not response in time...)

I also agree with that we could consider the global commits latter separately 
after we have addressed the final checkpoints, and also other points as Till 
summarized. 
Currently the only case that have used the cascade commit is the Table 
FileSystem and Hive connectors. I checked the code and found currently they 
will commit the 
last piece of data directly  in endOfInput(). Although this might emit repeat 
records if there are failover during job finishing, it avoids emitting the 
records in the 
notifyCheckpointComplete() after endOfInput(), thus the modification to the 
operator lifecycle in final checkpoints would cause compatibility problem for 
these connectors, 
thus we do not need to modify them at the first place. 

2. Regarding the operator lifecycle, I also agree with the proposed changes. To 
sum up, I think the operator lifecycle would become 

endOfInput(1)
...
endOfInput(n)
flush() --> call UDF's flush method
if some operator requires final checkpoints
snapshotState()
notifyCheckpointComplete()
end if
close() --> call UDF's close method

Since currently the close() is only called in normal finish and dispose() will 
be called in both failover and normal case, for compatibility, I think we may
have to postpone the change to a single close() method to version 2.0 ? 

3. Regarding the name and position of flush() method, I also agree with that we 
will need a separate method to mark the termination of the whole stream for 
multiple-input streams. Would it be also ok if we have some modification to the 
current BoundedXXInput interfaces to 

interface BoundedInput {
void endInput() // marks the end of the whole streams, as flush() does. 
}

@deprecated // In the future we could remove this interface
interface BoundedOneInput extends BoundedInput {}

interface BoundedMultiInput extends BoundedInput {
  void endInput(int i);

  default void endInput() {} // For compatibility 
}

If operator/UDF does not care about the end of a single input, then it could 
directly implement the BoundedInput interface. The possible 
benefit to me is that we might be able to keep only one concept for marking the 
end of stream, especially for the operators with only 
one input. 

Very thanks for all the deep insights and discussions!

Best,
Yun


--
From:Dawid Wysakowicz 
Send Time:2021 Jun. 3 (Thu.) 21:21
To:dev ; Till Rohrmann ; Yun Gao 

Cc:Piotr Nowojski ; Guowei Ma ; 
Stephan Ewen 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi all,
Thanks for the very insightful discussion. I'd like to revive the effort of 
FLIP-147. First of all, from my side I'd like to say that I am really 
interested in helping that happen in the upcoming 1.14 release.
I agree with Till that the final checkpoints and global commits are mostly 
orthogonal. Similarly as Till, I'd suggest to first focus on the final 
checkpoints, while just keeping in mind we should not make assumptions that 
would make it impossible to implement the global commits. So far I do not see 
such risk from the discussion.
Going back to the final checkpoints issue. I think the only outstanding issue 
is which methods we want to use for flushing/closing both operators and UDFs 
just before performing the final checkpoint. As pointed out to me by Piotr, I 
am mentioning UDFs here as well, because we need a way for users using the 
Public API to benefit from the final checkpoint (bear in mind that e.g. 
TwoPhaseCommitSinkFunction which is implemented by our Kafka sink operates on 
the UDF level). Right now RichFunction has no method which could be called just 
before the final checkpoint that would say "flush" all intermediate state now 
and prepare for the final checkpoint. I'd suggest introducing an additional 
interface e.g. (name to be determined)
interface Flushable {
   void flush(Collector out)
}
Additionally we would need to introduce a similar method on the StreamOperator 
level. Currently we have two methods that are called at the end of operator 
lifecycle: 
close 
dispose 
The usage of the two methods is a bit confusing. Dispose is responsible for 
closing all open resources and is supposed to be called in case of a failure. 
On the other hand the close is a combination of a non-existent "flush" method 
we need and dispose for closing resources in case of a successful run. I'd 
suggest to clear it a bit. We would introduce a proper "flush" method which 
would be called in case of a successful finishing of an operator. Moreover we 
would make "close" deal only with closing any open resources, basically taking 
over the role of the dispose, which we would deprecate.
Lastly, I'd like to say why I think it is better introduce a new "flush" method 
instead of using the "endInput" 

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-04 Thread Till Rohrmann
I did not mean for the user to sign its own certificates but for the
operator of the cluster. Once the user request hits the proxy, it should no
longer be under his control. I think I do not fully understand yet why this
would not work.

What I would like to avoid is to add more complexity into Flink if there is
an easy solution which fulfills the requirements. That's why I would like
to exercise thoroughly through the different alternatives. Also, I am
missing a bit the comparison of Kerberos to other authentication mechanisms
and why they were rejected in favour of Kerberos.

Cheers,
Till

On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra  wrote:

> Hi!
>
> I think there might be possible alternatives but it seems Kerberos on the
> rest endpoint ticks all the right boxes and provides a super clean and
> simple solution for strong authentication.
>
> I wouldn’t even consider sidecar proxies etc if we can solve it in such a
> simple way as proposed by G.
>
> Cheers
> Gyula
>
> On Fri, 4 Jun 2021 at 10:03, Till Rohrmann  wrote:
>
>> I am not saying that we shouldn't add a strong authentication mechanism
>> if there are good reasons for it. I primarily would like to understand the
>> context a bit better in order to give qualified feedback and come to a good
>> decision. In order to do this, I have the feeling that we haven't fully
>> considered all available options which are on the table, tbh.
>>
>> Does the problem of certificate expiry also apply for self-signed
>> certificates? If yes, then this should then also be a problem for the
>> internal encryption of Flink's communication. If not, then one could use
>> self-signed certificates with a longer validity to solve the mentioned
>> issue.
>>
>> I think you can set up Flink in such a way that you don't have to handle
>> all the different certificates. For example, you could deploy Flink with a
>> "sidecar proxy" which is responsible for the authentication using an
>> arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local
>> network interface. That way, the REST endpoint would only be available
>> through the sidecar proxy. Additionally, one could enable SSL for this
>> communication. Would this be a solution for the problem?
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi 
>> wrote:
>>
>>> That is an interesting idea, Till.
>>>
>>> The main issue with it is that TLS certificates have an expiration time,
>>> usually they get approved for a couple years. Forcing our users to restart
>>> jobs to reprovision TLS certificates would be weird when we could just
>>> implement a single proper strong authentication mechanism instead in a
>>> couple hundred lines of code. :-)
>>>
>>> In many cases it is also impractical to go the TLS mutual route, because
>>> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which
>>> means that we need a certificate per node (due to the mutual auth), but if
>>> we also want to protect the private key of these from users accidentally or
>>> intentionally leaking them then we need this per user. As in we end up
>>> managing user*machine number certificates and having to renew them
>>> periodically, which albeit automatable is unfortunately not yet automated
>>> in all large organizations.
>>>
>>> I fully agree that TLS certificate mutual authentication has its nice
>>> properties, especially at very large (multiple thousand node) clusters -
>>> but it has its own challenges too. Thanks for bringing it up.
>>>
>>> Happy to have this added to the rejected alternative list so that we
>>> have the full picture documented.
>>>
>>> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann 
>>> wrote:
>>>
 I guess the idea would then be to let the proxy do the authentication
 job and only forward the request via an SSL mutually encrypted connection
 to the Flink cluster. Would this be possible? The beauty of this setup is
 in my opinion that this setup should work with all kinds of authentication
 mechanisms.

 Cheers,
 Till

 On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi 
 wrote:

> Thanks for giving options to fulfil the need.
>
> Users are looking for a solution where users can be identified on the
> whole cluster and restrict access to resources/actions.
> A good example for such an action is cancelling other users running
> jobs.
>
> * SSL does provide mutual authentication but when authentication
> passed there is no user based on restrictions can be made.
> * The less problematic part is that generating/maintaining short time
> valid certificates would be a hard (that's the reason KDC like servers
> exist).
> Having long time valid certificates would widen the attack surface but
> since the first concern is there this is just a cosmetic issue.
>
> All in all using TLS certificates is not sufficient in these
> environments unfortunately.
>
> BR,
> G
>
>

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-04 Thread Gyula Fóra
Hi!

I think there might be possible alternatives but it seems Kerberos on the
rest endpoint ticks all the right boxes and provides a super clean and
simple solution for strong authentication.

I wouldn’t even consider sidecar proxies etc if we can solve it in such a
simple way as proposed by G.

Cheers
Gyula

On Fri, 4 Jun 2021 at 10:03, Till Rohrmann  wrote:

> I am not saying that we shouldn't add a strong authentication mechanism if
> there are good reasons for it. I primarily would like to understand the
> context a bit better in order to give qualified feedback and come to a good
> decision. In order to do this, I have the feeling that we haven't fully
> considered all available options which are on the table, tbh.
>
> Does the problem of certificate expiry also apply for self-signed
> certificates? If yes, then this should then also be a problem for the
> internal encryption of Flink's communication. If not, then one could use
> self-signed certificates with a longer validity to solve the mentioned
> issue.
>
> I think you can set up Flink in such a way that you don't have to handle
> all the different certificates. For example, you could deploy Flink with a
> "sidecar proxy" which is responsible for the authentication using an
> arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local
> network interface. That way, the REST endpoint would only be available
> through the sidecar proxy. Additionally, one could enable SSL for this
> communication. Would this be a solution for the problem?
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi 
> wrote:
>
>> That is an interesting idea, Till.
>>
>> The main issue with it is that TLS certificates have an expiration time,
>> usually they get approved for a couple years. Forcing our users to restart
>> jobs to reprovision TLS certificates would be weird when we could just
>> implement a single proper strong authentication mechanism instead in a
>> couple hundred lines of code. :-)
>>
>> In many cases it is also impractical to go the TLS mutual route, because
>> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which
>> means that we need a certificate per node (due to the mutual auth), but if
>> we also want to protect the private key of these from users accidentally or
>> intentionally leaking them then we need this per user. As in we end up
>> managing user*machine number certificates and having to renew them
>> periodically, which albeit automatable is unfortunately not yet automated
>> in all large organizations.
>>
>> I fully agree that TLS certificate mutual authentication has its nice
>> properties, especially at very large (multiple thousand node) clusters -
>> but it has its own challenges too. Thanks for bringing it up.
>>
>> Happy to have this added to the rejected alternative list so that we have
>> the full picture documented.
>>
>> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann 
>> wrote:
>>
>>> I guess the idea would then be to let the proxy do the authentication
>>> job and only forward the request via an SSL mutually encrypted connection
>>> to the Flink cluster. Would this be possible? The beauty of this setup is
>>> in my opinion that this setup should work with all kinds of authentication
>>> mechanisms.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi 
>>> wrote:
>>>
 Thanks for giving options to fulfil the need.

 Users are looking for a solution where users can be identified on the
 whole cluster and restrict access to resources/actions.
 A good example for such an action is cancelling other users running
 jobs.

 * SSL does provide mutual authentication but when authentication passed
 there is no user based on restrictions can be made.
 * The less problematic part is that generating/maintaining short time
 valid certificates would be a hard (that's the reason KDC like servers
 exist).
 Having long time valid certificates would widen the attack surface but
 since the first concern is there this is just a cosmetic issue.

 All in all using TLS certificates is not sufficient in these
 environments unfortunately.

 BR,
 G


 On Thu, Jun 3, 2021 at 12:49 PM Till Rohrmann 
 wrote:

> Thanks for the information Gabor. If it is about securing the
> communication between the REST client and the REST server, then Flink
> already supports enabling mutual SSL authentication [1]. Would this be
> enough to secure the communication and to pass an audit?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-ssl/#external--rest-connectivity
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 10:33 AM Gabor Somogyi <
> gabor.g.somo...@gmail.com> wrote:
>
>> Hi Till,
>>
>> Since I'm working in security area 10+ years let me share my thought.
>> I would like to 

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-04 Thread Gabor Somogyi
Till, thanks for investing time in giving further options.
Marci, thanks for summarizing the use-case point of view.

We've arrived back to one of the original problems. Namely if an attacker
gets access to a node it's possible to cancel other user's jobs (and more
can be done).
Self signed certificate is almost no-op authentication in production
environments because any user can sign its own certificate and no third
party plays.
This problem just can't be solved with SSL no matter from which point of
view we consider it.

BR,
G


On Fri, Jun 4, 2021 at 10:03 AM Till Rohrmann  wrote:

> I am not saying that we shouldn't add a strong authentication mechanism if
> there are good reasons for it. I primarily would like to understand the
> context a bit better in order to give qualified feedback and come to a good
> decision. In order to do this, I have the feeling that we haven't fully
> considered all available options which are on the table, tbh.
>
> Does the problem of certificate expiry also apply for self-signed
> certificates? If yes, then this should then also be a problem for the
> internal encryption of Flink's communication. If not, then one could use
> self-signed certificates with a longer validity to solve the mentioned
> issue.
>
> I think you can set up Flink in such a way that you don't have to handle
> all the different certificates. For example, you could deploy Flink with a
> "sidecar proxy" which is responsible for the authentication using an
> arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local
> network interface. That way, the REST endpoint would only be available
> through the sidecar proxy. Additionally, one could enable SSL for this
> communication. Would this be a solution for the problem?
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi 
> wrote:
>
>> That is an interesting idea, Till.
>>
>> The main issue with it is that TLS certificates have an expiration time,
>> usually they get approved for a couple years. Forcing our users to restart
>> jobs to reprovision TLS certificates would be weird when we could just
>> implement a single proper strong authentication mechanism instead in a
>> couple hundred lines of code. :-)
>>
>> In many cases it is also impractical to go the TLS mutual route, because
>> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which
>> means that we need a certificate per node (due to the mutual auth), but if
>> we also want to protect the private key of these from users accidentally or
>> intentionally leaking them then we need this per user. As in we end up
>> managing user*machine number certificates and having to renew them
>> periodically, which albeit automatable is unfortunately not yet automated
>> in all large organizations.
>>
>> I fully agree that TLS certificate mutual authentication has its nice
>> properties, especially at very large (multiple thousand node) clusters -
>> but it has its own challenges too. Thanks for bringing it up.
>>
>> Happy to have this added to the rejected alternative list so that we have
>> the full picture documented.
>>
>> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann 
>> wrote:
>>
>>> I guess the idea would then be to let the proxy do the authentication
>>> job and only forward the request via an SSL mutually encrypted connection
>>> to the Flink cluster. Would this be possible? The beauty of this setup is
>>> in my opinion that this setup should work with all kinds of authentication
>>> mechanisms.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi 
>>> wrote:
>>>
 Thanks for giving options to fulfil the need.

 Users are looking for a solution where users can be identified on the
 whole cluster and restrict access to resources/actions.
 A good example for such an action is cancelling other users running
 jobs.

 * SSL does provide mutual authentication but when authentication passed
 there is no user based on restrictions can be made.
 * The less problematic part is that generating/maintaining short time
 valid certificates would be a hard (that's the reason KDC like servers
 exist).
 Having long time valid certificates would widen the attack surface but
 since the first concern is there this is just a cosmetic issue.

 All in all using TLS certificates is not sufficient in these
 environments unfortunately.

 BR,
 G


 On Thu, Jun 3, 2021 at 12:49 PM Till Rohrmann 
 wrote:

> Thanks for the information Gabor. If it is about securing the
> communication between the REST client and the REST server, then Flink
> already supports enabling mutual SSL authentication [1]. Would this be
> enough to secure the communication and to pass an audit?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-ssl/#external--rest-connectivity
>
> Cheers,
> Till
>

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-04 Thread Till Rohrmann
I am not saying that we shouldn't add a strong authentication mechanism if
there are good reasons for it. I primarily would like to understand the
context a bit better in order to give qualified feedback and come to a good
decision. In order to do this, I have the feeling that we haven't fully
considered all available options which are on the table, tbh.

Does the problem of certificate expiry also apply for self-signed
certificates? If yes, then this should then also be a problem for the
internal encryption of Flink's communication. If not, then one could use
self-signed certificates with a longer validity to solve the mentioned
issue.

I think you can set up Flink in such a way that you don't have to handle
all the different certificates. For example, you could deploy Flink with a
"sidecar proxy" which is responsible for the authentication using an
arbitrary method (e.g. Kerberos) and then bind the REST endpoint to a local
network interface. That way, the REST endpoint would only be available
through the sidecar proxy. Additionally, one could enable SSL for this
communication. Would this be a solution for the problem?

Cheers,
Till

On Thu, Jun 3, 2021 at 10:46 PM Márton Balassi 
wrote:

> That is an interesting idea, Till.
>
> The main issue with it is that TLS certificates have an expiration time,
> usually they get approved for a couple years. Forcing our users to restart
> jobs to reprovision TLS certificates would be weird when we could just
> implement a single proper strong authentication mechanism instead in a
> couple hundred lines of code. :-)
>
> In many cases it is also impractical to go the TLS mutual route, because
> the Flink Dashboard can end up on any node in the k8s/Yarn cluster which
> means that we need a certificate per node (due to the mutual auth), but if
> we also want to protect the private key of these from users accidentally or
> intentionally leaking them then we need this per user. As in we end up
> managing user*machine number certificates and having to renew them
> periodically, which albeit automatable is unfortunately not yet automated
> in all large organizations.
>
> I fully agree that TLS certificate mutual authentication has its nice
> properties, especially at very large (multiple thousand node) clusters -
> but it has its own challenges too. Thanks for bringing it up.
>
> Happy to have this added to the rejected alternative list so that we have
> the full picture documented.
>
> On Thu, Jun 3, 2021 at 5:52 PM Till Rohrmann  wrote:
>
>> I guess the idea would then be to let the proxy do the authentication job
>> and only forward the request via an SSL mutually encrypted connection to
>> the Flink cluster. Would this be possible? The beauty of this setup is in
>> my opinion that this setup should work with all kinds of authentication
>> mechanisms.
>>
>> Cheers,
>> Till
>>
>> On Thu, Jun 3, 2021 at 3:12 PM Gabor Somogyi 
>> wrote:
>>
>>> Thanks for giving options to fulfil the need.
>>>
>>> Users are looking for a solution where users can be identified on the
>>> whole cluster and restrict access to resources/actions.
>>> A good example for such an action is cancelling other users running jobs.
>>>
>>> * SSL does provide mutual authentication but when authentication passed
>>> there is no user based on restrictions can be made.
>>> * The less problematic part is that generating/maintaining short time
>>> valid certificates would be a hard (that's the reason KDC like servers
>>> exist).
>>> Having long time valid certificates would widen the attack surface but
>>> since the first concern is there this is just a cosmetic issue.
>>>
>>> All in all using TLS certificates is not sufficient in these
>>> environments unfortunately.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Thu, Jun 3, 2021 at 12:49 PM Till Rohrmann 
>>> wrote:
>>>
 Thanks for the information Gabor. If it is about securing the
 communication between the REST client and the REST server, then Flink
 already supports enabling mutual SSL authentication [1]. Would this be
 enough to secure the communication and to pass an audit?

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/security/security-ssl/#external--rest-connectivity

 Cheers,
 Till

 On Thu, Jun 3, 2021 at 10:33 AM Gabor Somogyi <
 gabor.g.somo...@gmail.com> wrote:

> Hi Till,
>
> Since I'm working in security area 10+ years let me share my thought.
> I would like to emphasise there are experts better than me but I have
> some
> basics.
> The discussion is open and not trying to tell alone things...
>
> > I mean if an attacker can get access to one of the machines, then it
> should also be possible to obtain the right Kerberos token.
> Not necessarily. For example if one gets access to a specific user's
> credentials then it's not possible to compromise other user's jobs,
> data,
> etc...
> Security is like an onion, 

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-04 Thread Dawid Wysakowicz
Hi Eron,

I might be missing some background on Pulsar partitioning but something
seems off to me. What is the chunk/batch/partition that Pulsar brokers
will additionally combine watermarks for? Isn't it the case that only a
single Flink sub-task would write to such a chunk and thus will produce
an aggregated watermark already via the writeWatermark method?

Personally I am really skeptical about exposing the StreamStatus in any
Producer API. In my understanding the StreamStatus is a transient
setting of a consumer of data. StreamStatus is a mechanism for making a
tradeoff between correctness (how many late elements that are behind
watermark we have) vs making progress. IMO one has to be extra cautious
when it comes to persistent systems. Again I might be missing the exact
use case you are trying to solve here, but I can imagine multiple jobs
reading from such a stream which might have different correctness
requirements. Just quickly throwing an idea out of my head you might
want to have an entirely correct results which can be delayed for
minutes, and a separate task that produces quick insights within
seconds. Another thing to consider is that by the time the downstream
job starts consuming the upstream one might have produced records to the
previously idle chunk. Persisting the StreamStatus in such a scenario
would add unnecessary false late events.

In my understanding a StreamStatus makes sense only when talking about
immediately consumed transient channels such as between operators within
a single job.

Best,

Dawid

On 03/06/2021 23:31, Eron Wright wrote:
> I think the rationale for end-to-end idleness (i.e. between pipelines) is
> the same as the rationale for idleness between operators within a
> pipeline.   On the 'main issue' you mentioned, we entrust the source with
> adapting to Flink's notion of idleness (e.g. in Pulsar source, it means
> that no topics/partitions are assigned to a given sub-task); a similar
> adaption would occur in the sink.  In other words, I think it reasonable
> that a sink for a watermark-aware storage system has need for the idleness
> signal.
>
> Let me explain how I would use it in Pulsar's sink.  Each sub-task is a
> Pulsar producer, and is writing watermarks to a configured topic via the
> Producer API.  The Pulsar broker aggregates the watermarks that are written
> by each producer into a global minimum (similar to StatusWatermarkValve).
> The broker keeps track of which producers are actively producing
> watermarks, and a producer may mark itself as idle to tell the broker not
> to wait for watermarks from it, e.g. when a producer is going offline.  I
> had intended to mark the producer as idle when the sub-task is closing, but
> now I see that it would be insufficient; the producer should also be idled
> if the sub-task is idled.  Otherwise, the broker would wait indefinitely
> for the idled sub-task to produce a watermark.
>
> Arvid, I think your original instincts were correct about idleness
> propagation, and I hope I've demonstrated a practical use case.
>
>
>
> On Thu, Jun 3, 2021 at 12:49 PM Arvid Heise  wrote:
>
>> When I was rethinking the idleness issue, I came to the conclusion that it
>> should be inferred at the source of the respective downstream pipeline
>> again.
>>
>> The main issue on propagating idleness is that you would force the same
>> definition across all downstream pipelines, which may not be what the user
>> intended.
>> On the other hand, I don't immediately see a technical reason why the
>> downstream source wouldn't be able to infer that.
>>
>>
>> On Thu, Jun 3, 2021 at 9:14 PM Eron Wright > .invalid>
>> wrote:
>>
>>> Thanks Piotr for bringing this up.  I reflected on this and I agree we
>>> should expose idleness, otherwise a multi-stage flow could stall.
>>>
>>> Regarding the latency markers, I don't see an immediate need for
>>> propagating them, because they serve to estimate latency within a
>> pipeline,
>>> not across pipelines.  One would probably need to enhance the source
>>> interface also to do e2e latency.  Seems we agree this aspect is out of
>>> scope.
>>>
>>> I took a look at the code to get a sense of how to accomplish this.  The
>>> gist is a new `markIdle` method on the `StreamOperator` interface, that
>> is
>>> called when the stream status maintainer (the `OperatorChain`)
>> transitions
>>> to idle state.  Then, a new `markIdle` method on the `SinkFunction` and
>>> `SinkWriter` that is called by the respective operators.   Note that
>>> StreamStatus is an internal class.
>>>
>>> Here's a draft PR (based on the existing PR of FLINK-22700) to highlight
>>> this new aspect:
>>> https://github.com/streamnative/flink/pull/2/files
>>>
>>> Please let me know if you'd like me to proceed to update the FLIP with
>>> these details.
>>>
>>> Thanks again,
>>> Eron
>>>
>>> On Thu, Jun 3, 2021 at 7:56 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 Sorry for chipping in late in the discussion, but I would second this
>>> 

Re: [Discuss] Planning Flink 1.14

2021-06-04 Thread Prasanna kumar
Hi all,

We are using Flink for our eventing system. Overall we are very happy with
the tech, documentation and community support and quick replies in mails.

My last 1 year experience with versions.

We were working on 1.10 initially during our research phase then we
stabilised with 1.11 as we moved on but by the time we are about to get
into production 1.12 was released. As with all software and products,
there were bugs reported. So we waited till 1.12.2 was released and then
upgraded. Within a month of us doing it 1.13 got released.

But by past experience , we waited till at least a couple of minor
versions(fixing bugs) get released before we move onto a newer version.
The development happens at a rapid/good phase in flink (which is good in
terms of features) but adoption and moving the production code to newer
version 3/4 times a year is an onerous effort. For example , the memory
model was changed in one of the releases (there is a good documentation) .
But as a production user to adopt the newer version, at least a month of
testing is required with a huge scale environment. We also do not want to
be behind more than 2 versions at any point of time.

I Personally feel 2 major releases a year or at max a release once 5 months
is good.

Thanks
Prasanna.

On Fri, Jun 4, 2021 at 9:38 AM Xintong Song  wrote:

> Thanks everyone for the feedback.
>
> @Jing,
> Thanks for the inputs. Could you please ask a committer who works together
> with you on these items to fill them into the feature collecting wiki page
> [1]? I assume Jark, who co-edited the flip wiki page, is working with you?
>
> @Kurt, @Till and @Seth,
> First of all, a few things that potentially demotivate users from
> upgrading, observed from users that I've been in touch with.
> 1. It takes time for Flink major releases to get stabilized. Many users
> tend to waitting for the bugfix releases (x.y.1/2, or even x.y.3/4) rather
> than upgrading to x.y.0 immediately. This could take months, sometimes even
> after the next major release.
> 2. Many users maintain an internal version of Flink, with customized
> features for their specific businesses. For them, upgrading Flink requires
> significant efforts to rebase those customized features. On the other hand,
> the more versions they are left behind, the harder to contribute those
> features to the community, becoming a vicious cycle.
>
> I think the question to be answered is how do we prioritize between
> stabilizing a previous major release and casting a new major release. So
> far, it feels like the new release is prior. I recall that we have waited
> for weeks to release 1.11.3 because people were busy stabilizing 1.12.0.
> What if more resources are lean to the bugfix releases? We may have a more
> explicit schedule for the bugfix releases. E.g., try to always release the
> first bugfix release 2 weeks after the major release, the second bugfix
> release 4 weeks after that, and release on-demand starting from the third
> bugfix release. Or some other rules like this. Would that help speeding up
> the stabilization of release and give the users more confidence to upgrade
> earlier?
>
> A related question is how do we prioritize between casting a release and
> motivating more contributors. According to my experience, what Kurt
> described, that committers cannot help contributors due to "planned
> features", usually happens during the release testing period or right
> before that (when people are struggling to catch the feature freeze). This
> probably indicates that currently casting a release timely is prioritized
> over the contributor's experience. Do we need to change that?
>
> If extending the release period does not come in a way that simply more
> features are pushed into each release, but rather allowing a longer period
> for the release to get stabilized while leaving more capacity for bugfix
> releases and helping contributors, it might be a good idea. To be specific,
> currently we have the 4 months period as 3 months feature developing + 1
> month release testing. We might consider a 5 months period as 3 months
> feature developing + 2 month release testing.
>
> To sum up, I'm leaning towards extending the overall release period a bit,
> while keeping the period before feature freeze. WDYT?
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/1.14+Release
>
> On Thu, Jun 3, 2021 at 9:00 PM Seth Wiesman  wrote:
>
> > Hi Everyone,
> >
> > +1 for the Release Managers. Thank you all for volunteering.
> >
> > @Till Rohrmann  A common sentiment that I have
> heard
> > from many users is that upgrading off of 1.9 was very difficult. In
> > particular, a lot of people struggled to understand the new memory model.
> > Many users who required custom memory configurations in earlier versions
> > assumed they should carry those configurations into latter versions and
> > then found themselves with OOM and instability issues. The good news is
> > Flink