[jira] [Created] (FLINK-30038) HiveE2E test is not stable

2022-11-15 Thread Shammon (Jira)
Shammon created FLINK-30038:
---

 Summary: HiveE2E test is not stable
 Key: FLINK-30038
 URL: https://issues.apache.org/jira/browse/FLINK-30038
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Affects Versions: table-store-0.3.0
Reporter: Shammon


https://github.com/apache/flink-table-store/actions/runs/3476726197/jobs/5812201704

Caused by: org.testcontainers.containers.ContainerLaunchException: Timed out 
waiting for log output matching '.*Starting HiveServer2.*'
at 
org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy.waitUntilReady(LogMessageWaitStrategy.java:49)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30037) Improve the efficiency of Flink ML Python CI

2022-11-15 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30037:


 Summary: Improve the efficiency of Flink ML Python CI
 Key: FLINK-30037
 URL: https://issues.apache.org/jira/browse/FLINK-30037
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Affects Versions: ml-2.1.0
Reporter: Yunfeng Zhou


It took about thirty minutes to execute Flink ML's python CI[1] for now, which 
has obviously affected the efficiency of Flink ML development. Thus we need to 
reduce the total execution time of Flink ML Python CI.

[1] https://github.com/apache/flink-ml/actions/runs/3475256961



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30036) Force delete pod when k8s node is not ready

2022-11-15 Thread Peng Yuan (Jira)
Peng Yuan created FLINK-30036:
-

 Summary: Force delete pod when  k8s node is not ready
 Key: FLINK-30036
 URL: https://issues.apache.org/jira/browse/FLINK-30036
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Peng Yuan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session

2022-11-15 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-30035:
--

 Summary: ./bin/sql-client.sh won't import external jar into the 
session
 Key: FLINK-30035
 URL: https://issues.apache.org/jira/browse/FLINK-30035
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.16.0
Reporter: Steven Zhen Wu


I used to be able to run the sql-client with iceberg-flink-runtime jar using 
the `-j,--jar ` option (e.g. with 1.15.2). 
```
./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar
```

With 1.16.0, this doesn't work anymore. As a result, I am seeing 
ClassNotFoundException.
```
java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog
```

I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the 
`flink/lib` directory to make the jar loaded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30034) class not found exception when using Hive 3.1.3 connector

2022-11-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-30034:


 Summary: class not found exception when using Hive 3.1.3 connector
 Key: FLINK-30034
 URL: https://issues.apache.org/jira/browse/FLINK-30034
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: luoyuxia






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Allow sharing (RocksDB) memory between slots

2022-11-15 Thread Xintong Song
Concerning isolation, I think ideally we want everything to be isolated
between jobs running in the same cluster (i.e., slots in the same TM).
Unfortunately, this is impractical.
- Heap / Off-heap memory are directly allocated / deallocated through JVM /
OS. Flink does not have a good way to cap their usages per slot.
- Network memory does not have the good property of managed memory, that a
job can adapt to any given amount of managed memory (with a very small min
limitation). We are trying to improve network memory towards that
direction, and once achieved it can be isolated as well.
I think not being able to isolate all kinds of memory does not mean we
should give up the isolation on all kinds of memory. And I believe "managed
memory is isolated and others are not" is much easier for the users to
understand compared to "part of the managed memory is isolated and others
are not".

By waste, I meant reserving a certain amount of memory that is only used by
certain use cases that do not always exist. This is exactly what we want to
avoid with managed memory in FLIP-49 [1]. We used to have managed memory
only used for batch operators, and a containerized-cut-off memory
(something similar to framework.off-heap) for rocksdb state backend. The
problem was that, if the user does not change the configuration when
switching between streaming / batch jobs, there would always be some memory
(managed or cut-off) wasted. Similarly, introducing a shared managed memory
zone means reserving one more dedicated part of memory that can get wasted
in many cases. This is probably a necessary price for this new feature, but
let's not break the concept / properties of managed memory for it.

In your proposal, the fraction for the share managed memory is by default
0. That means to enable the rocksdb memory sharing, users need to manually
increase the fraction anyway. Thus, having the memory sharing rocksdb use
managed memory or off-heap memory does not make a significant difference
for the new feature users. I'd think of this as "extra operational overhead
for users of a certain new feature" vs. "significant learning cost and
potential behavior change for pretty much all users". I'd be fine with
having some shortcuts to simplify the configuration on the user side for
this new feature, but not to invade the managed memory.

Best,

Xintong


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors

On Tue, Nov 15, 2022 at 5:46 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Thanks for your reply Xingong Song,
>
> Could you please elaborate on the following:
>
> > The proposed changes break several good properties that we designed for
> > managed memory.
> > 1. It's isolated across slots
> Just to clarify, any way to manage the memory efficiently while capping its
> usage
> will break the isolation. It's just a matter of whether it's managed memory
> or not.
> Do you see any reasons why unmanaged memory can be shared, and managed
> memory can not?
>
> > 2. It should never be wasted (unless there's nothing in the job that
> needs
> > managed memory)
> If I understand correctly, the managed memory can already be wasted because
> it is divided evenly between slots, regardless of the existence of its
> consumers in a particular slot.
> And in general, even if every slot has RocksDB / python, it's not
> guaranteed equal consumption.
> So this property would rather be fixed in the current proposal.
>
> > In addition, it further complicates configuration / computation logics of
> > managed memory.
> I think having multiple options overriding each other increases the
> complexity for the user. As for the computation, I think it's desirable to
> let Flink do it rather than users.
>
> Both approaches need some help from TM for:
> - storing the shared resources (static field in a class might be too
> dangerous because if the backend is loaded by the user-class-loader then
> memory will leak silently).
> - reading the configuration
>
> Regards,
> Roman
>
>
> On Sun, Nov 13, 2022 at 11:24 AM Xintong Song 
> wrote:
>
> > I like the idea of sharing RocksDB memory across slots. However, I'm
> quite
> > concerned by the current proposed approach.
> >
> > The proposed changes break several good properties that we designed for
> > managed memory.
> > 1. It's isolated across slots
> > 2. It should never be wasted (unless there's nothing in the job that
> needs
> > managed memory)
> > In addition, it further complicates configuration / computation logics of
> > managed memory.
> >
> > As an alternative, I'd suggest introducing a variant of
> > RocksDBStateBackend, that shares memory across slots and does not use
> > managed memory. This basically means the shared memory is not considered
> as
> > part of managed memory. For users of this new feature, they would need to
> > configure how much memory the variant state backend should use, and
> > probably also a larger 

[jira] [Created] (FLINK-30033) Add primary key data type validation

2022-11-15 Thread Shammon (Jira)
Shammon created FLINK-30033:
---

 Summary: Add primary key data type validation
 Key: FLINK-30033
 URL: https://issues.apache.org/jira/browse/FLINK-30033
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.3.0
Reporter: Shammon


Add primary key data type validation, table store can refer to hive in 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-CreateTableCreate/Drop/TruncateTable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Release Flink 1.15.3

2022-11-15 Thread Danny Cranmer
Hey Fabian,

I am out this week, I can take a look Monday if still required.

Thanks,
Danny

On Tue, Nov 15, 2022 at 8:37 PM Martijn Visser 
wrote:

> Hi Fabian,
>
> I'll try to have a look tomorrow.
>
> Cheers, Martijn
>
> On Tue, Nov 15, 2022 at 6:44 PM Fabian Paul  wrote:
>
> > Hi all,
> >
> > The release vote for 1.15.3-rc1 is open [1]. Unfortunately, I am still
> > missing some votes
> > and would kindly ask for your help to make this release happen :)
> >
> > Best,
> > Fabian
> >
> > [1] https://lists.apache.org/thread/73l524189mpyrjokzxwb5smt80582pw1
> >
> > On Thu, Nov 10, 2022 at 7:28 PM Martijn Visser  >
> > wrote:
> > >
> > > Hi Fabian,
> > >
> > > I've added 1.15.4 as a new release version.
> > >
> > > Thanks, Martijn
> > >
> > > On Thu, Nov 10, 2022 at 5:18 PM Fabian Paul <
> fabian.p...@databricks.com.invalid>
> > wrote:
> > >>
> > >> I conclude that the community has accepted another release, and I will
> > open
> > >> the voting thread shortly. Can someone with PMC rights add 1.15.4 as a
> > new
> > >> release version in JIRA [1] so that I can update the still open
> tickets?
> > >>
> > >> Best,
> > >> Fabian
> > >>
> > >> [1]
> > >>
> >
> https://issues.apache.org/jira/plugins/servlet/project-config/FLINK/versions
> > >>
> > >> On Wed, Nov 2, 2022 at 2:07 PM Fabian Paul  wrote:
> > >>
> > >> > Thanks for all the replies. @xintong I'll definitely come back to
> your
> > >> > offer when facing steps that require PMC rights for the release.
> > >> >
> > >> > I checked the JIRA and found four blocking/critical issues affecting
> > 1.15.2
> > >> >
> > >> > - FLINK-29830 
> > >> > - FLINK-29492 
> > >> > - FLINK-29315 
> > >> > - FLINK-29234 
> > >> >
> > >> > I'll reach out to the ticket owners to get their opinion about the
> > current
> > >> > status. In case, someone knows of some pending fixes that I haven't
> > >> > mentioned please let me know.
> > >> >
> > >> > Best,
> > >> > Fabian
> > >> >
> > >> > On Wed, Oct 26, 2022 at 2:01 PM Konstantin Knauf  >
> > >> > wrote:
> > >> >
> > >> >> +1, thanks Fabian.
> > >> >>
> > >> >> Am Mi., 26. Okt. 2022 um 08:26 Uhr schrieb Danny Cranmer <
> > >> >> dannycran...@apache.org>:
> > >> >>
> > >> >> > +1, thanks for driving this Fabian.
> > >> >> >
> > >> >> > Danny,
> > >> >> >
> > >> >> > On Wed, Oct 26, 2022 at 2:22 AM yuxia <
> luoyu...@alumni.sjtu.edu.cn
> > >
> > >> >> wrote:
> > >> >> >
> > >> >> > > Thanks for driving this.
> > >> >> > > +1 for release 1.15.3
> > >> >> > >
> > >> >> > > Best regards,
> > >> >> > > Yuxia
> > >> >> > >
> > >> >> > > - 原始邮件 -
> > >> >> > > 发件人: "Leonard Xu" 
> > >> >> > > 收件人: "dev" 
> > >> >> > > 发送时间: 星期二, 2022年 10 月 25日 下午 10:00:47
> > >> >> > > 主题: Re: [DISCUSS] Release Flink 1.15.3
> > >> >> > >
> > >> >> > > Thanks Fabian for driving this.
> > >> >> > >
> > >> >> > > +1 to release 1.15.3.
> > >> >> > >
> > >> >> > > The bug tickets FLINK-26394 and FLINK-27148 should be fixed as
> > well,
> > >> >> I’ll
> > >> >> > > help to address them soon.
> > >> >> > >
> > >> >> > > Best,
> > >> >> > > Leonard Xu
> > >> >> > >
> > >> >> > >
> > >> >> > >
> > >> >> > > > 2022年10月25日 下午8:28,Jing Ge  写道:
> > >> >> > > >
> > >> >> > > > +1 The timing is good to have 1.15.3 release. Thanks Fabian
> for
> > >> >> > bringing
> > >> >> > > > this to our attention.
> > >> >> > > >
> > >> >> > > > I just checked PRs and didn't find the 1.15 backport of
> > FLINK-29567
> > >> >> > > > . Please
> be
> > >> >> aware
> > >> >> > of
> > >> >> > > it.
> > >> >> > > > Thanks!
> > >> >> > > >
> > >> >> > > > Best regards,
> > >> >> > > > Jing
> > >> >> > > >
> > >> >> > > > On Tue, Oct 25, 2022 at 11:44 AM Xintong Song <
> > >> >> tonysong...@gmail.com>
> > >> >> > > wrote:
> > >> >> > > >
> > >> >> > > >> Thanks for bringing this up, Fabian.
> > >> >> > > >>
> > >> >> > > >> +1 for creating a 1.15.3 release. I've also seen users
> > requiring
> > >> >> this
> > >> >> > > >> version [1].
> > >> >> > > >>
> > >> >> > > >> I can help with actions that require a PMC role, if needed.
> > >> >> > > >>
> > >> >> > > >> Best,
> > >> >> > > >>
> > >> >> > > >> Xintong
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> [1]
> > >> >> https://lists.apache.org/thread/501q4l1c6gs8hwh433bw3v1y8fs9cw2n
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> On Tue, Oct 25, 2022 at 5:11 PM Fabian Paul <
> fp...@apache.org
> > >
> > >> >> wrote:
> > >> >> > > >>
> > >> >> > > >>> Hi all,
> > >> >> > > >>>
> > >> >> > > >>> I want to start the discussion of creating a new 1.15 patch
> > >> >> release
> > >> >> > > >>> (1.15.3). The last 1.15 release is almost two months old,
> and
> > >> >> since
> > >> >> > > then,
> > >> >> > > >>> ~60 tickets have been closed, targeting 1.15.3. It 

Re: [DISCUSS] Release Flink 1.15.3

2022-11-15 Thread Martijn Visser
Hi Fabian,

I'll try to have a look tomorrow.

Cheers, Martijn

On Tue, Nov 15, 2022 at 6:44 PM Fabian Paul  wrote:

> Hi all,
>
> The release vote for 1.15.3-rc1 is open [1]. Unfortunately, I am still
> missing some votes
> and would kindly ask for your help to make this release happen :)
>
> Best,
> Fabian
>
> [1] https://lists.apache.org/thread/73l524189mpyrjokzxwb5smt80582pw1
>
> On Thu, Nov 10, 2022 at 7:28 PM Martijn Visser 
> wrote:
> >
> > Hi Fabian,
> >
> > I've added 1.15.4 as a new release version.
> >
> > Thanks, Martijn
> >
> > On Thu, Nov 10, 2022 at 5:18 PM Fabian Paul 
> > 
> wrote:
> >>
> >> I conclude that the community has accepted another release, and I will
> open
> >> the voting thread shortly. Can someone with PMC rights add 1.15.4 as a
> new
> >> release version in JIRA [1] so that I can update the still open tickets?
> >>
> >> Best,
> >> Fabian
> >>
> >> [1]
> >>
> https://issues.apache.org/jira/plugins/servlet/project-config/FLINK/versions
> >>
> >> On Wed, Nov 2, 2022 at 2:07 PM Fabian Paul  wrote:
> >>
> >> > Thanks for all the replies. @xintong I'll definitely come back to your
> >> > offer when facing steps that require PMC rights for the release.
> >> >
> >> > I checked the JIRA and found four blocking/critical issues affecting
> 1.15.2
> >> >
> >> > - FLINK-29830 
> >> > - FLINK-29492 
> >> > - FLINK-29315 
> >> > - FLINK-29234 
> >> >
> >> > I'll reach out to the ticket owners to get their opinion about the
> current
> >> > status. In case, someone knows of some pending fixes that I haven't
> >> > mentioned please let me know.
> >> >
> >> > Best,
> >> > Fabian
> >> >
> >> > On Wed, Oct 26, 2022 at 2:01 PM Konstantin Knauf 
> >> > wrote:
> >> >
> >> >> +1, thanks Fabian.
> >> >>
> >> >> Am Mi., 26. Okt. 2022 um 08:26 Uhr schrieb Danny Cranmer <
> >> >> dannycran...@apache.org>:
> >> >>
> >> >> > +1, thanks for driving this Fabian.
> >> >> >
> >> >> > Danny,
> >> >> >
> >> >> > On Wed, Oct 26, 2022 at 2:22 AM yuxia  >
> >> >> wrote:
> >> >> >
> >> >> > > Thanks for driving this.
> >> >> > > +1 for release 1.15.3
> >> >> > >
> >> >> > > Best regards,
> >> >> > > Yuxia
> >> >> > >
> >> >> > > - 原始邮件 -
> >> >> > > 发件人: "Leonard Xu" 
> >> >> > > 收件人: "dev" 
> >> >> > > 发送时间: 星期二, 2022年 10 月 25日 下午 10:00:47
> >> >> > > 主题: Re: [DISCUSS] Release Flink 1.15.3
> >> >> > >
> >> >> > > Thanks Fabian for driving this.
> >> >> > >
> >> >> > > +1 to release 1.15.3.
> >> >> > >
> >> >> > > The bug tickets FLINK-26394 and FLINK-27148 should be fixed as
> well,
> >> >> I’ll
> >> >> > > help to address them soon.
> >> >> > >
> >> >> > > Best,
> >> >> > > Leonard Xu
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > > 2022年10月25日 下午8:28,Jing Ge  写道:
> >> >> > > >
> >> >> > > > +1 The timing is good to have 1.15.3 release. Thanks Fabian for
> >> >> > bringing
> >> >> > > > this to our attention.
> >> >> > > >
> >> >> > > > I just checked PRs and didn't find the 1.15 backport of
> FLINK-29567
> >> >> > > > . Please be
> >> >> aware
> >> >> > of
> >> >> > > it.
> >> >> > > > Thanks!
> >> >> > > >
> >> >> > > > Best regards,
> >> >> > > > Jing
> >> >> > > >
> >> >> > > > On Tue, Oct 25, 2022 at 11:44 AM Xintong Song <
> >> >> tonysong...@gmail.com>
> >> >> > > wrote:
> >> >> > > >
> >> >> > > >> Thanks for bringing this up, Fabian.
> >> >> > > >>
> >> >> > > >> +1 for creating a 1.15.3 release. I've also seen users
> requiring
> >> >> this
> >> >> > > >> version [1].
> >> >> > > >>
> >> >> > > >> I can help with actions that require a PMC role, if needed.
> >> >> > > >>
> >> >> > > >> Best,
> >> >> > > >>
> >> >> > > >> Xintong
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> [1]
> >> >> https://lists.apache.org/thread/501q4l1c6gs8hwh433bw3v1y8fs9cw2n
> >> >> > > >>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> On Tue, Oct 25, 2022 at 5:11 PM Fabian Paul  >
> >> >> wrote:
> >> >> > > >>
> >> >> > > >>> Hi all,
> >> >> > > >>>
> >> >> > > >>> I want to start the discussion of creating a new 1.15 patch
> >> >> release
> >> >> > > >>> (1.15.3). The last 1.15 release is almost two months old, and
> >> >> since
> >> >> > > then,
> >> >> > > >>> ~60 tickets have been closed, targeting 1.15.3. It includes
> >> >> critical
> >> >> > > >>> changes to the sink architecture, including:
> >> >> > > >>>
> >> >> > > >>> - Reverting the sink metric naming [1]
> >> >> > > >>> - Recovery problems for sinks using the GlobalCommitter
> [2][3][4]
> >> >> > > >>>
> >> >> > > >>> If the community agrees to create a new patch release, I
> could
> >> >> > > volunteer
> >> >> > > >> as
> >> >> > > >>> the release manager.
> >> >> > > >>>
> >> >> > > >>> Best,
> >> >> > > >>> Fabian
> >> >> > > >>>
> >> >> > > >>> [1] https://issues.apache.org/jira/browse/FLINK-29567
> >> 

[jira] [Created] (FLINK-30032) IOException during MAX_WATERMARK emission causes message missing

2022-11-15 Thread Haoze Wu (Jira)
Haoze Wu created FLINK-30032:


 Summary: IOException during MAX_WATERMARK emission causes message 
missing
 Key: FLINK-30032
 URL: https://issues.apache.org/jira/browse/FLINK-30032
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Haoze Wu


We are doing testing on Flink (version 1.14.0). We launch 1 
StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink 
client which submit a WordCount workload. The code is similar to 
[https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
 and we only add a Kafka topic output:

 
{code:java}
    private static DataStreamSink addKafkaSink(
            final DataStream events, final String brokers, final String 
topic) {
        return events.sinkTo(KafkaSink.builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setValueSerializationSchema(new 
SimpleStringSchema())
                                .setTopic(topic)
                                .build())
                .build());
    }

    public static void run(final String[] args) throws Exception {
        final String brokers = args[0];
        final String textFilePath = args[1];
        final String kafkaTopic = args[2];
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        final DataStream text = env.readTextFile(textFilePath);
        final DataStream> counts =
                text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
        addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
        final long nano = System.nanoTime();
        env.execute("WordCount");
        FlinkGrayClientMain.reply("success", nano);
    }
 {code}
We found that sometimes the Kafka topic fails to receive a few messages. We 
reproduce the symptom multiple times. We found that the Kafka topic always gets 
160~169 messages while the expected number of messages is 170. We also found 
that the missing messages are always the expected last few messages from the 
170 expected messages.

Then we inspect the logs and code.

First, we have an IOException to one of the TaskManagerRunner:

 

 
{code:java}
2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator 
(ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark 
while closing
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 

Re: [DISCUSS] Release Flink 1.15.3

2022-11-15 Thread Fabian Paul
Hi all,

The release vote for 1.15.3-rc1 is open [1]. Unfortunately, I am still
missing some votes
and would kindly ask for your help to make this release happen :)

Best,
Fabian

[1] https://lists.apache.org/thread/73l524189mpyrjokzxwb5smt80582pw1

On Thu, Nov 10, 2022 at 7:28 PM Martijn Visser  wrote:
>
> Hi Fabian,
>
> I've added 1.15.4 as a new release version.
>
> Thanks, Martijn
>
> On Thu, Nov 10, 2022 at 5:18 PM Fabian Paul 
>  wrote:
>>
>> I conclude that the community has accepted another release, and I will open
>> the voting thread shortly. Can someone with PMC rights add 1.15.4 as a new
>> release version in JIRA [1] so that I can update the still open tickets?
>>
>> Best,
>> Fabian
>>
>> [1]
>> https://issues.apache.org/jira/plugins/servlet/project-config/FLINK/versions
>>
>> On Wed, Nov 2, 2022 at 2:07 PM Fabian Paul  wrote:
>>
>> > Thanks for all the replies. @xintong I'll definitely come back to your
>> > offer when facing steps that require PMC rights for the release.
>> >
>> > I checked the JIRA and found four blocking/critical issues affecting 1.15.2
>> >
>> > - FLINK-29830 
>> > - FLINK-29492 
>> > - FLINK-29315 
>> > - FLINK-29234 
>> >
>> > I'll reach out to the ticket owners to get their opinion about the current
>> > status. In case, someone knows of some pending fixes that I haven't
>> > mentioned please let me know.
>> >
>> > Best,
>> > Fabian
>> >
>> > On Wed, Oct 26, 2022 at 2:01 PM Konstantin Knauf 
>> > wrote:
>> >
>> >> +1, thanks Fabian.
>> >>
>> >> Am Mi., 26. Okt. 2022 um 08:26 Uhr schrieb Danny Cranmer <
>> >> dannycran...@apache.org>:
>> >>
>> >> > +1, thanks for driving this Fabian.
>> >> >
>> >> > Danny,
>> >> >
>> >> > On Wed, Oct 26, 2022 at 2:22 AM yuxia 
>> >> wrote:
>> >> >
>> >> > > Thanks for driving this.
>> >> > > +1 for release 1.15.3
>> >> > >
>> >> > > Best regards,
>> >> > > Yuxia
>> >> > >
>> >> > > - 原始邮件 -
>> >> > > 发件人: "Leonard Xu" 
>> >> > > 收件人: "dev" 
>> >> > > 发送时间: 星期二, 2022年 10 月 25日 下午 10:00:47
>> >> > > 主题: Re: [DISCUSS] Release Flink 1.15.3
>> >> > >
>> >> > > Thanks Fabian for driving this.
>> >> > >
>> >> > > +1 to release 1.15.3.
>> >> > >
>> >> > > The bug tickets FLINK-26394 and FLINK-27148 should be fixed as well,
>> >> I’ll
>> >> > > help to address them soon.
>> >> > >
>> >> > > Best,
>> >> > > Leonard Xu
>> >> > >
>> >> > >
>> >> > >
>> >> > > > 2022年10月25日 下午8:28,Jing Ge  写道:
>> >> > > >
>> >> > > > +1 The timing is good to have 1.15.3 release. Thanks Fabian for
>> >> > bringing
>> >> > > > this to our attention.
>> >> > > >
>> >> > > > I just checked PRs and didn't find the 1.15 backport of FLINK-29567
>> >> > > > . Please be
>> >> aware
>> >> > of
>> >> > > it.
>> >> > > > Thanks!
>> >> > > >
>> >> > > > Best regards,
>> >> > > > Jing
>> >> > > >
>> >> > > > On Tue, Oct 25, 2022 at 11:44 AM Xintong Song <
>> >> tonysong...@gmail.com>
>> >> > > wrote:
>> >> > > >
>> >> > > >> Thanks for bringing this up, Fabian.
>> >> > > >>
>> >> > > >> +1 for creating a 1.15.3 release. I've also seen users requiring
>> >> this
>> >> > > >> version [1].
>> >> > > >>
>> >> > > >> I can help with actions that require a PMC role, if needed.
>> >> > > >>
>> >> > > >> Best,
>> >> > > >>
>> >> > > >> Xintong
>> >> > > >>
>> >> > > >>
>> >> > > >> [1]
>> >> https://lists.apache.org/thread/501q4l1c6gs8hwh433bw3v1y8fs9cw2n
>> >> > > >>
>> >> > > >>
>> >> > > >>
>> >> > > >> On Tue, Oct 25, 2022 at 5:11 PM Fabian Paul 
>> >> wrote:
>> >> > > >>
>> >> > > >>> Hi all,
>> >> > > >>>
>> >> > > >>> I want to start the discussion of creating a new 1.15 patch
>> >> release
>> >> > > >>> (1.15.3). The last 1.15 release is almost two months old, and
>> >> since
>> >> > > then,
>> >> > > >>> ~60 tickets have been closed, targeting 1.15.3. It includes
>> >> critical
>> >> > > >>> changes to the sink architecture, including:
>> >> > > >>>
>> >> > > >>> - Reverting the sink metric naming [1]
>> >> > > >>> - Recovery problems for sinks using the GlobalCommitter [2][3][4]
>> >> > > >>>
>> >> > > >>> If the community agrees to create a new patch release, I could
>> >> > > volunteer
>> >> > > >> as
>> >> > > >>> the release manager.
>> >> > > >>>
>> >> > > >>> Best,
>> >> > > >>> Fabian
>> >> > > >>>
>> >> > > >>> [1] https://issues.apache.org/jira/browse/FLINK-29567
>> >> > > >>> [2] https://issues.apache.org/jira/browse/FLINK-29509
>> >> > > >>> [3] https://issues.apache.org/jira/browse/FLINK-29512
>> >> > > >>> [4] https://issues.apache.org/jira/browse/FLINK-29627
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> >>
>> >> --
>> >> https://twitter.com/snntrable
>> >> https://github.com/knaufk
>> >>
>> >


Re: [VOTE] Release 1.15.3, release candidate #1

2022-11-15 Thread Fabian Paul
Hi again,

Unfortunately, in the initial email, the links are not correctly
displayed, thus
please use the information below for testing.

Please review and vote on the release candidate #1 for the version 1.15.3,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:

- JIRA release notes [1],
- the official Apache source release and binary convenience releases to
be deployed to dist.apache.org [2], which are signed with the key with
fingerprint 90755B0A184BD9FFD22B6BE19D4F76C84EC11E37 [3],
- all artifacts to be deployed to the Maven Central Repository [4],
- source code tag "release-1.15.3-rc1" [5],
- website pull request listing the new release and adding announcement
blog post [6].

Best,
Fabian

[1] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352210
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.15.3-rc1
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1548
[5] https://github.com/apache/flink/tree/release-1.15.3-rc1
[6] https://github.com/apache/flink-web/pull/581

On Mon, Nov 14, 2022 at 8:45 AM Fabian Paul  wrote:
>
> Hi everyone,
>
> I am still looking for volunteers to validate the release. I'll extend
> the voting period by another 48hours, please try to give it some time.
>
> Best,
> Fabian
>
>
> On Thu, Nov 10, 2022 at 5:18 PM Fabian Paul  wrote:
> >
> > Hi everyone, Please review and vote on the release candidate #1 for the 
> > version 1.15.3, as follows: [ ] +1, Approve the release [ ] -1, Do not 
> > approve the release (please provide specific comments) The complete staging 
> > area is available for your review, which includes: - JIRA release notes 
> > [1], - the official Apache source release and binary convenience releases 
> > to be deployed to dist.apache.org [2], which are signed with the key with 
> > fingerprint 90755B0A184BD9FFD22B6BE19D4F76C84EC11E37 [3], - all artifacts 
> > to be deployed to the Maven Central Repository [4], - source code tag 
> > "release-1.15.3-rc1" [5], - website pull request listing the new release 
> > and adding announcement blog post [6]. The vote will be open for at least 
> > 72 hours. It is adopted by majority approval, with at least 3 PMC 
> > affirmative votes.
> >
> > Best, Fabian
> > [1] 
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352210
> >  [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.15.3-rc1
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS [4] 
> > https://repository.apache.org/content/repositories/orgapacheflink-1548 [5] 
> > https://github.com/apache/flink/tree/release-1.15.3-rc1 [6] 
> > https://github.com/apache/flink-web/pull/581


[Vote] Remove FlinkKafkaConsumer and graduate related Source APIs

2022-11-15 Thread Jing Ge
Hi,

As discussed on the mailing list[1]. I'd like to start a vote to remove
FlinkKafkaConsumer and graduate related Source APIs.

Since there are some ongoing developments which will change SinkV2 APIs,
depending on how far it would go, I might start another vote to remove
FlinkKafkaProducer before the coding freeze of 1.17 release.

Voting will be open for at least 72 hours.

Best regards,
Jing

[1] https://lists.apache.org/thread/m3o48c2d8j9g5t9s89hqs6qvr924s71o


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Maximilian Michels
Of course! Let me know if your concerns are addressed. The wiki page has
been updated.

>It will be great to add this in the FLIP so that reviewers can understand
how the source parallelisms are computed and how the algorithm works
end-to-end.

I've updated the FLIP page to add more details on how the backlog-based
scaling works (2).

>These metrics and configs are public API and need to be stable across
minor versions, could we document them before finalizing the FLIP?

Metrics and config changes are not strictly part of the public API but
Gyula has added a section.

-Max

On Tue, Nov 15, 2022 at 3:01 PM Dong Lin  wrote:

> Hi Maximilian,
>
> It seems that the following comments from the previous discussions have not
> been addressed yet. Any chance we can have them addressed before starting
> the voting thread?
>
> Thanks,
> Dong
>
> On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:
>
> > Hi Dong!
> >
> > Let me try to answer the questions :)
> >
> > 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> > spent in the main record processing loop for an operator if I
> > understand correctly. This includes IO operations too.
> >
> > 2: We should add this to the FLIP I agree. It would be a Duration config
> > with the expected catch up time after rescaling (let's say 5 minutes). It
> > could be computed based on the current data rate and the calculated max
> > processing rate after the rescale.
> >
>
> It will be great to add this in the FLIP so that reviewers can understand
> how the source parallelisms are computed and how the algorithm works
> end-to-end.
>
>
> > 3: In the current proposal we don't have per operator configs. Target
> > utilization would apply to all operators uniformly.
> >
> > 4: It should be configurable, yes.
> >
>
> Since this config is a public API, could we update the FLIP accordingly to
> provide this config?
>
>
> >
> > 5,6: The names haven't been finalized but I think these are minor
> details.
> > We could add concrete names to the FLIP :)
> >
>
> These metrics and configs are public API and need to be stable across minor
> versions, could we document them before finalizing the FLIP?
>
>
> >
> > Cheers,
> > Gyula
> >
> >
> > On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
> >
> >> Hi Max,
> >>
> >> Thank you for the proposal. The proposal tackles a very important issue
> >> for Flink users and the design looks promising overall!
> >>
> >> I have some questions to better understand the proposed public
> interfaces
> >> and the algorithm.
> >>
> >> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
> >> could reach 1 sec. I believe this is mostly true for cpu-bound
> operators.
> >> Could you confirm that this can also be true for io-bound operators
> such as
> >> sinks? For example, suppose a Kafka Sink subtask has reached I/O
> bottleneck
> >> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
> >> reach 1 sec?
> >>
> >> 2) It is said that "users can configure a maximum time to fully process
> >> the backlog". The configuration section does not seem to provide this
> >> config. Could you specify this? And any chance this proposal can provide
> >> the formula for calculating the new processing rate?
> >>
> >> 3) How are users expected to specify the per-operator configs (e.g.
> >> target utilization)? For example, should users specify it
> programmatically
> >> in a DataStream/Table/SQL API?
> >>
> >> 4) How often will the Flink Kubernetes operator query metrics from
> >> JobManager? Is this configurable?
> >>
> >> 5) Could you specify the config name and default value for the proposed
> >> configs?
> >>
> >> 6) Could you add the name/mbean/type for the proposed metrics?
> >>
> >>
> >> Cheers,
> >> Dong
> >>
> >>
> >>
>


[jira] [Created] (FLINK-30031) flink table store run abnormally when shade flink

2022-11-15 Thread zhenzhenhua (Jira)
zhenzhenhua created FLINK-30031:
---

 Summary: flink table store run abnormally when shade flink
 Key: FLINK-30031
 URL: https://issues.apache.org/jira/browse/FLINK-30031
 Project: Flink
  Issue Type: Bug
Affects Versions: table-store-0.2.1, 1.15.0
Reporter: zhenzhenhua


 

{color:#172b4d}I try to sink flink-table-store in Apache SeaTunnel, SeaTunnel 
use the Flink version is 13.6.{color}

{color:#172b4d}To avoid flink conflict,I use maven-shade-plugin plugin to shade 
flink dependency.{color}

{color:#172b4d}However, runing build jar to write  flink-table-store occur 
error,throw exception:{color}

 
{code:java}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/table/store/codegen/CodeGenerator
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at 
org.apache.flink.table.store.codegen.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:127)
        at 
org.apache.flink.table.store.codegen.ComponentClassLoader.loadClass(ComponentClassLoader.java:106)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at 
org.apache.flink.table.store.codegen.CodeGenLoader.discover(CodeGenLoader.java:123)
        at 
org.apache.flink.table.store.codegen.CodeGenUtils.generateRecordComparator(CodeGenUtils.java:65)
        at 
org.apache.flink.table.store.file.utils.KeyComparatorSupplier.(KeyComparatorSupplier.java:40)
        at 
org.apache.flink.table.store.file.KeyValueFileStore.(KeyValueFileStore.java:59)
        at 
org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable.(ChangelogWithKeyFileStoreTable.java:103)
        at 
org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:72)
        at 
org.apache.flink.table.store.table.FileStoreTableFactory.create(FileStoreTableFactory.java:50)
        at org.example.TestWrite.main(TestWrite.java:24)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.store.codegen.CodeGenerator
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at 
org.apache.flink.table.store.codegen.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:127)
        at 
org.apache.flink.table.store.codegen.ComponentClassLoader.loadClass(ComponentClassLoader.java:106)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 27 more
 {code}
 

 
{code:java}
// pom.xml


org.apache.flink
flink-table-store-core
0.2.1



org.apache.flink
flink-table-store-format
0.2.1


org.apache.flink
flink-table-common
1.15.0


org.apache.flink
flink-table-runtime
1.15.0


org.apache.flink
flink-connector-base
1.15.0



org.apache.flink
flink-shaded-jackson
2.12.1-13.0

 
org.apache.flink
flink-shaded-hadoop-2-uber
2.7.5-10.0


org.apache.commons
commons-lang3
3.3.2


log4j
log4j
1.2.17







org.apache.maven.plugins
maven-shade-plugin
3.1.1


false




package

shade




org.apache.flink

shade.org.apache.flink


org.apache.flink.table.store.**


 

Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-15 Thread Lincoln Lee
Hi all,

Sorry for the late jump in this thread.

I agree with dawid that we should discuss api changes from the perspective
of proper endOfInput semantics, and I understand the motivation for this
api change and that it should make sense for potential user scenarios.

As a table/sql layer developer, I would also like to share some thoughts
and inputs(please correct me if my understanding is wrong):

I'd like to start with a question: Why are people not complaining in
eventtime mode?  Everyone seems to accept the behavior that in eventtime
mode, when bounded source ends, the system issues a LONG.MAX watermark from
source, and then all eventtime timers in downstream operators are in fact
triggered once, like a window operator, even though it seems window is not
finished (events does not fill the expected window size), but it is
finished (including the sql layer operators, which also force the window to
close and outputs an 'incomplete' result)

Secondly, what exactly does endofinput mean for a bounded source?  I think
it represents the permanent end of the source, without any chance of
continuation in the future.

If we can agree on this clear semantics, then let's see if there is a
fundamental difference between `proctime` vs `eventime`?
I think proctime should be simpler (from the sql perspective, `proctime`
has no stricter semantics than `eventime`)

So what I'm trying to say is that if it's acceptable for everyone to
trigger all untriggered eventtime timers directly when endOfInput in
eventtime mode, why don't we keep the same behavior in `proctime` by
default?

Finally, we can discuss in which user scenarios this default system
behavior may not be satisfied and needs to be extended, which I think may
push this discussion more smoothly.

Some additional input, current dependencies of sql layer operators on
proctime timer:
The three types of operators I have seen so far are essentially the window
type, include interval join, over window, and group window, which do
nothing in the close phase for current implementation(and do not implement
the finish method), and the computation only relies on the watermark
trigger. If the underlying processing of watermark is unified on
`eventtime` and `proctime`, then the sql layer operators will also benefit
and maintain consistent behaviors.

Best,
Lincoln Lee


Dong Lin  于2022年11月15日周二 17:48写道:

> Thank you Yun for the detailed explanation!
>
> Since this FLIP can add quite some complexity to Flink, it will be really
> useful to understand the concrete case-case for the proposed changes so
> that we can identify the approach with minimum extra complexity. We can
> probably continue the discussion after the FLIP is updated with
> the use-cases.
>
> Please see my comments inline.
>
> On Tue, Nov 15, 2022 at 4:18 PM Yun Gao 
> wrote:
>
> > Hi Dong,
> > Very thanks for the discussion!
> > > It appears that the issues mentioned in the motivation section
> > > are all related to using Windows on the DataStream API, where
> > > the user's code typically does not have anything to do with Timer.
> > IMO it might not only bounded to the case of window operators. For
> > examples, users might implements some complex aggregation logic
> > with ProcessFunction directly. In this case, users might also want to
> > control how these times are dealt at the end of the stream.
>
>
>
>
> > > IMO, the main benefit of this alternative solution is that it is more
> > > consistent with the existing Windows API. Users who are concerned
> > > with firing windows on end-of-stream won't need to additionally
> > > understand/handle timer.
> > First to summary the problem, currently it could be divided into two
> > layers:
> > 1. In the lower layer we need to support different actions to deal with
> > the timers at the end of the stream (though in fact we need to deduct
> > whether we need this ability from the api, but for simplicity I'll first
> > describe
> > this layer since the divergency happen in the higher level).
> > 2. How we let users to specify the actions at the end of the timer?
> > Currently
> > we have different options on this layer.
> >  - The first option is to have a unified SingleOperatorStream#
> >  setTimerTerminationAction.
> >  - The second option is to have a specialized trigger for the window.
> > With whichever interface, in the window operator need to set proper
> > termination actions according to the specified semantics when registering
> > timers.
> > On the other side, specially to the WindowOperator, the interface might
> > not
> > only related to the timers, there are also window types, e.g.
> CountWindow,
> >  that also need to specify the behavior at the end of stream.
> > Therefore, for window operators it looks to me it would be indeed more
> > friendly
> > to users to have a uniform API. Since different operators might have
> > different
> > situations, I wonder it would be better if we first:
> > 1. The operator authors could still set the default actions 

Re: Env Vars in the Flink Web UI

2022-11-15 Thread Konstantin Knauf
Hi everyone,

important correction, this is since 1.16.0, not 1.17+.

Best,

Konstantin

Am Di., 15. Nov. 2022 um 14:25 Uhr schrieb Gyula Fóra :

> Thanks for bringing this important issue to discussion Konstantin!
>
> I am in favor of not showing them by default with an optional configuration
> to enable it.
> Otherwise this poses a big security risk of exposing previously hidden
> information after upgrade.
>
> Gyula
>
> On Tue, Nov 15, 2022 at 2:15 PM Maximilian Michels  wrote:
>
> > Hey Konstantin,
> >
> > I'd be in favor of not printing them at all, i.e. option (d). We have the
> > configuration page which lists the effective config and already removes
> any
> > known secrets.
> >
> > -Max
> >
> > On Tue, Nov 15, 2022 at 11:26 AM Konstantin Knauf 
> > wrote:
> >
> > > Hi all,
> > >
> > > since Flink 1.17 [1] the Flink Web UI prints *all* environment
> variables
> > of
> > > the Taskmanager or Jobmanagers hosts (Jobmanager -> Configuration ->
> > > Environment). Given that environment variables are often used to store
> > > sensitive information, I think, it is wrong and dangerous to print
> those
> > in
> > > the Flink Web UI. Specifically, thinking about how Kubernetes Secrets
> are
> > > usually injected into Pods.
> > >
> > > One could argue that anyone who can submit a Flink Job to a cluster has
> > > access to these environment variables anyway, but not everyone who has
> > > access to the Flink UI can submit a Flink Job.
> > >
> > > I see the the following options:
> > > a) leave as is
> > > b) apply same obfuscation as in flink-conf.yaml based on some heuristic
> > (no
> > > "secret", "password" in env var name)
> > > c) only print allow-listed values
> > > d) don't print any env vars in the web UI (at least by default)
> > >
> > > What do you think?
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-28311
> > >
> > > --
> > > https://twitter.com/snntrable
> > > https://github.com/knaufk
> > >
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Dong Lin
Hi Maximilian,

It seems that the following comments from the previous discussions have not
been addressed yet. Any chance we can have them addressed before starting
the voting thread?

Thanks,
Dong

On Mon, Nov 7, 2022 at 2:33 AM Gyula Fóra  wrote:

> Hi Dong!
>
> Let me try to answer the questions :)
>
> 1 : busyTimeMsPerSecond is not specific for CPU, it measures the time
> spent in the main record processing loop for an operator if I
> understand correctly. This includes IO operations too.
>
> 2: We should add this to the FLIP I agree. It would be a Duration config
> with the expected catch up time after rescaling (let's say 5 minutes). It
> could be computed based on the current data rate and the calculated max
> processing rate after the rescale.
>

It will be great to add this in the FLIP so that reviewers can understand
how the source parallelisms are computed and how the algorithm works
end-to-end.


> 3: In the current proposal we don't have per operator configs. Target
> utilization would apply to all operators uniformly.
>
> 4: It should be configurable, yes.
>

Since this config is a public API, could we update the FLIP accordingly to
provide this config?


>
> 5,6: The names haven't been finalized but I think these are minor details.
> We could add concrete names to the FLIP :)
>

These metrics and configs are public API and need to be stable across minor
versions, could we document them before finalizing the FLIP?


>
> Cheers,
> Gyula
>
>
> On Sun, Nov 6, 2022 at 5:19 PM Dong Lin  wrote:
>
>> Hi Max,
>>
>> Thank you for the proposal. The proposal tackles a very important issue
>> for Flink users and the design looks promising overall!
>>
>> I have some questions to better understand the proposed public interfaces
>> and the algorithm.
>>
>> 1) The proposal seems to assume that the operator's busyTimeMsPerSecond
>> could reach 1 sec. I believe this is mostly true for cpu-bound operators.
>> Could you confirm that this can also be true for io-bound operators such as
>> sinks? For example, suppose a Kafka Sink subtask has reached I/O bottleneck
>> when flushing data out to the Kafka clusters, will busyTimeMsPerSecond
>> reach 1 sec?
>>
>> 2) It is said that "users can configure a maximum time to fully process
>> the backlog". The configuration section does not seem to provide this
>> config. Could you specify this? And any chance this proposal can provide
>> the formula for calculating the new processing rate?
>>
>> 3) How are users expected to specify the per-operator configs (e.g.
>> target utilization)? For example, should users specify it programmatically
>> in a DataStream/Table/SQL API?
>>
>> 4) How often will the Flink Kubernetes operator query metrics from
>> JobManager? Is this configurable?
>>
>> 5) Could you specify the config name and default value for the proposed
>> configs?
>>
>> 6) Could you add the name/mbean/type for the proposed metrics?
>>
>>
>> Cheers,
>> Dong
>>
>>
>>


Re: Env Vars in the Flink Web UI

2022-11-15 Thread Gyula Fóra
Thanks for bringing this important issue to discussion Konstantin!

I am in favor of not showing them by default with an optional configuration
to enable it.
Otherwise this poses a big security risk of exposing previously hidden
information after upgrade.

Gyula

On Tue, Nov 15, 2022 at 2:15 PM Maximilian Michels  wrote:

> Hey Konstantin,
>
> I'd be in favor of not printing them at all, i.e. option (d). We have the
> configuration page which lists the effective config and already removes any
> known secrets.
>
> -Max
>
> On Tue, Nov 15, 2022 at 11:26 AM Konstantin Knauf 
> wrote:
>
> > Hi all,
> >
> > since Flink 1.17 [1] the Flink Web UI prints *all* environment variables
> of
> > the Taskmanager or Jobmanagers hosts (Jobmanager -> Configuration ->
> > Environment). Given that environment variables are often used to store
> > sensitive information, I think, it is wrong and dangerous to print those
> in
> > the Flink Web UI. Specifically, thinking about how Kubernetes Secrets are
> > usually injected into Pods.
> >
> > One could argue that anyone who can submit a Flink Job to a cluster has
> > access to these environment variables anyway, but not everyone who has
> > access to the Flink UI can submit a Flink Job.
> >
> > I see the the following options:
> > a) leave as is
> > b) apply same obfuscation as in flink-conf.yaml based on some heuristic
> (no
> > "secret", "password" in env var name)
> > c) only print allow-listed values
> > d) don't print any env vars in the web UI (at least by default)
> >
> > What do you think?
> >
> > Cheers,
> >
> > Konstantin
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-28311
> >
> > --
> > https://twitter.com/snntrable
> > https://github.com/knaufk
> >
>


Re: Env Vars in the Flink Web UI

2022-11-15 Thread Maximilian Michels
Hey Konstantin,

I'd be in favor of not printing them at all, i.e. option (d). We have the
configuration page which lists the effective config and already removes any
known secrets.

-Max

On Tue, Nov 15, 2022 at 11:26 AM Konstantin Knauf  wrote:

> Hi all,
>
> since Flink 1.17 [1] the Flink Web UI prints *all* environment variables of
> the Taskmanager or Jobmanagers hosts (Jobmanager -> Configuration ->
> Environment). Given that environment variables are often used to store
> sensitive information, I think, it is wrong and dangerous to print those in
> the Flink Web UI. Specifically, thinking about how Kubernetes Secrets are
> usually injected into Pods.
>
> One could argue that anyone who can submit a Flink Job to a cluster has
> access to these environment variables anyway, but not everyone who has
> access to the Flink UI can submit a Flink Job.
>
> I see the the following options:
> a) leave as is
> b) apply same obfuscation as in flink-conf.yaml based on some heuristic (no
> "secret", "password" in env var name)
> c) only print allow-listed values
> d) don't print any env vars in the web UI (at least by default)
>
> What do you think?
>
> Cheers,
>
> Konstantin
>
> [1] https://issues.apache.org/jira/browse/FLINK-28311
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Gyula Fóra
I agree we should start the vote.

On a separate (but related) small discussion we could also decide
backporting https://issues.apache.org/jira/browse/FLINK-29501 for 1.16.1 so
that the autoscaler could be more efficiently developed and tested and to
make it 1.16 compatible.

Cheers,
Gyula

On Tue, Nov 15, 2022 at 2:09 PM Maximilian Michels  wrote:

> +1 If there are no further comments, I'll start a vote thread in the next
> few days.
>
> -Max
>
>
> On Tue, Nov 15, 2022 at 2:06 PM Zheng Yu Chen  wrote:
>
> > @Gyula  Have a  good news, now flip-256 now is finish and merge it .
> >  flip-271 discussion seems to have stopped and I wonder if there are any
> > other comments. Can we get to the polls and start this exciting feature
> 
> > Maybe I can get involved in developing this feature
> >
> >
> >
> > Gyula Fóra  于2022年11月8日周二 18:46写道:
> >
> > > I had 2 extra comments to Max's reply:
> > >
> > > 1. About pre-allocating resources:
> > > This could be done through the operator when the standalone deployment
> > mode
> > > is used relatively easily as there we have better control of
> > > pods/resources.
> > >
> > > 2. Session jobs:
> > > There is a FLIP (
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> > > )
> > > to support passing configuration when we submit jobs to the session
> > cluster
> > > through the rest api. Once that goes through, session jobs can also be
> > > scaled in a similar way through the configuration.
> > >
> > > Cheers,
> > > Gyula
> > >
> > >
> > > On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels 
> > wrote:
> > >
> > > > @Yang
> > > >
> > > > >Since the current auto-scaling needs to fully redeploy the
> > application,
> > > it
> > > > may fail to start due to lack of resources.
> > > >
> > > > Great suggestions. I agree that we will have to have to preallocate /
> > > > reserve resources to ensure the rescaling doesn't take longer as
> > > expected.
> > > > This is not only a problem when scaling up but also when scaling down
> > > > because any pods surrendered might be taken over by another
> deployment
> > > > during the rescaling. This would certainly be a case for integrating
> > > > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > > > rescaling API. Alternatively, the operator would have to reserve the
> > > > resources somehow.
> > > >
> > > > >Does auto-scaling have a plan to support jobs which are running in a
> > > > session cluster? It might be a different
> > > >
> > > > We are targeting the application deployment mode for the first
> version
> > > but
> > > > the standalone mode can be supported as soon as we have an
> integration
> > > with
> > > > the scheduler.
> > > >
> > > > > # Horizontal scaling V.S. Vertical scaling
> > > >
> > > > True. We left out vertical scaling intentionally. For now we assume
> > CPU /
> > > > memory is set up by the user. While definitely useful, vertical
> scaling
> > > > adds another dimension to the scaling problem which we wanted to
> tackle
> > > > later. I'll update the FLIP to explicitly state that.
> > > >
> > > > -Max
> > > >
> > > >
> > > >
> > > > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang 
> > wrote:
> > > >
> > > > > Thanks for the fruitful discussion and I am really excited to see
> > that
> > > > the
> > > > > auto-scaling really happens for
> > > > >
> > > > > Flink Kubernetes operator. It will be a very important step to make
> > the
> > > > > long-running Flink job more smoothly.
> > > > >
> > > > > I just have some immature ideas and want to share them here.
> > > > >
> > > > >
> > > > > # Resource Reservation
> > > > >
> > > > > Since the current auto-scaling needs to fully redeploy the
> > application,
> > > > it
> > > > > may fail to start due to lack of resources.
> > > > >
> > > > > I know the Kubernetes operator could rollback to the old spec, but
> we
> > > > still
> > > > > waste a lot of time to make things worse.
> > > > >
> > > > > I hope the FLIP-250[1](support customized K8s scheduler) could help
> > in
> > > > this
> > > > > case.
> > > > >
> > > > >
> > > > > # Session cluster
> > > > >
> > > > > Does auto-scaling have a plan to support jobs which are running in
> a
> > > > > session cluster? It might be a different
> > > > >
> > > > > story since we could not use Flink config options to override the
> > job
> > > > > vertex parallelisms. Given that the SessionJob
> > > > >
> > > > > is also a first-class citizen, we need to document the limitation
> if
> > > not
> > > > > support.
> > > > >
> > > > >
> > > > > # Horizontal scaling V.S. Vertical scaling
> > > > >
> > > > > IIUC, the current proposal does not mention vertical scaling. There
> > > might
> > > > > be a chance that the memory/cpu of
> > > > >
> > > > > TaskManager is not configured properly. And this will cause
> > unnecessary
> > > > > multiple scaling executions.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > [1].
> > > > >
> > > > 

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Maximilian Michels
+1 If there are no further comments, I'll start a vote thread in the next
few days.

-Max


On Tue, Nov 15, 2022 at 2:06 PM Zheng Yu Chen  wrote:

> @Gyula  Have a  good news, now flip-256 now is finish and merge it .
>  flip-271 discussion seems to have stopped and I wonder if there are any
> other comments. Can we get to the polls and start this exciting feature 
> Maybe I can get involved in developing this feature
>
>
>
> Gyula Fóra  于2022年11月8日周二 18:46写道:
>
> > I had 2 extra comments to Max's reply:
> >
> > 1. About pre-allocating resources:
> > This could be done through the operator when the standalone deployment
> mode
> > is used relatively easily as there we have better control of
> > pods/resources.
> >
> > 2. Session jobs:
> > There is a FLIP (
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> > )
> > to support passing configuration when we submit jobs to the session
> cluster
> > through the rest api. Once that goes through, session jobs can also be
> > scaled in a similar way through the configuration.
> >
> > Cheers,
> > Gyula
> >
> >
> > On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels 
> wrote:
> >
> > > @Yang
> > >
> > > >Since the current auto-scaling needs to fully redeploy the
> application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > Great suggestions. I agree that we will have to have to preallocate /
> > > reserve resources to ensure the rescaling doesn't take longer as
> > expected.
> > > This is not only a problem when scaling up but also when scaling down
> > > because any pods surrendered might be taken over by another deployment
> > > during the rescaling. This would certainly be a case for integrating
> > > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > > rescaling API. Alternatively, the operator would have to reserve the
> > > resources somehow.
> > >
> > > >Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > We are targeting the application deployment mode for the first version
> > but
> > > the standalone mode can be supported as soon as we have an integration
> > with
> > > the scheduler.
> > >
> > > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > True. We left out vertical scaling intentionally. For now we assume
> CPU /
> > > memory is set up by the user. While definitely useful, vertical scaling
> > > adds another dimension to the scaling problem which we wanted to tackle
> > > later. I'll update the FLIP to explicitly state that.
> > >
> > > -Max
> > >
> > >
> > >
> > > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang 
> wrote:
> > >
> > > > Thanks for the fruitful discussion and I am really excited to see
> that
> > > the
> > > > auto-scaling really happens for
> > > >
> > > > Flink Kubernetes operator. It will be a very important step to make
> the
> > > > long-running Flink job more smoothly.
> > > >
> > > > I just have some immature ideas and want to share them here.
> > > >
> > > >
> > > > # Resource Reservation
> > > >
> > > > Since the current auto-scaling needs to fully redeploy the
> application,
> > > it
> > > > may fail to start due to lack of resources.
> > > >
> > > > I know the Kubernetes operator could rollback to the old spec, but we
> > > still
> > > > waste a lot of time to make things worse.
> > > >
> > > > I hope the FLIP-250[1](support customized K8s scheduler) could help
> in
> > > this
> > > > case.
> > > >
> > > >
> > > > # Session cluster
> > > >
> > > > Does auto-scaling have a plan to support jobs which are running in a
> > > > session cluster? It might be a different
> > > >
> > > > story since we could not use Flink config options to override the
> job
> > > > vertex parallelisms. Given that the SessionJob
> > > >
> > > > is also a first-class citizen, we need to document the limitation if
> > not
> > > > support.
> > > >
> > > >
> > > > # Horizontal scaling V.S. Vertical scaling
> > > >
> > > > IIUC, the current proposal does not mention vertical scaling. There
> > might
> > > > be a chance that the memory/cpu of
> > > >
> > > > TaskManager is not configured properly. And this will cause
> unnecessary
> > > > multiple scaling executions.
> > > >
> > > >
> > > >
> > > >
> > > > [1].
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > > >
> > > >
> > > >
> > > > Best,
> > > >
> > > > Yang
> > > >
> > > > Maximilian Michels  于2022年11月8日周二 00:31写道:
> > > >
> > > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > > already did a great job addressing the questions here. Let me try
> to
> > > > > add additional context:
> > > > >
> > > > > @Biao Geng:
> > > > >
> > > > > >1.  For source parallelisms, if the user configure a much larger
> > value
> > > > > than normal, there should be very little pending records though it
> is
> 

Re: [jira] [Created] (FLINK-29856) Triggering savepoint does not trigger source operator checkpoint

2022-11-15 Thread Maximilian Michels
This sounds like a pretty serious issue. Has anybody looked into this
already?

-Max

On Thu, Nov 3, 2022 at 3:51 AM Mason Chen (Jira)  wrote:

> Mason Chen created FLINK-29856:
> --
>
>  Summary: Triggering savepoint does not trigger source
> operator checkpoint
>  Key: FLINK-29856
>  URL: https://issues.apache.org/jira/browse/FLINK-29856
>  Project: Flink
>   Issue Type: Improvement
>   Components: Runtime / Checkpointing
> Affects Versions: 1.16.0
> Reporter: Mason Chen
>
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke
> snapshotState or notifyCheckpointComplete. This is easily reproducible in a
> simple pipeline (e.g. KafkaSource -> print).
>
>
>
> However, when the checkpoint occurs via the interval, I do see the sources
> checkpointing properly and expected logs in the output.
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.20.10#820010)
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-15 Thread Zheng Yu Chen
@Gyula  Have a  good news, now flip-256 now is finish and merge it .
 flip-271 discussion seems to have stopped and I wonder if there are any
other comments. Can we get to the polls and start this exciting feature 
Maybe I can get involved in developing this feature



Gyula Fóra  于2022年11月8日周二 18:46写道:

> I had 2 extra comments to Max's reply:
>
> 1. About pre-allocating resources:
> This could be done through the operator when the standalone deployment mode
> is used relatively easily as there we have better control of
> pods/resources.
>
> 2. Session jobs:
> There is a FLIP (
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-256%3A+Support+Job+Dynamic+Parameter+With+Flink+Rest+Api
> )
> to support passing configuration when we submit jobs to the session cluster
> through the rest api. Once that goes through, session jobs can also be
> scaled in a similar way through the configuration.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 8, 2022 at 11:39 AM Maximilian Michels  wrote:
>
> > @Yang
> >
> > >Since the current auto-scaling needs to fully redeploy the application,
> it
> > may fail to start due to lack of resources.
> >
> > Great suggestions. I agree that we will have to have to preallocate /
> > reserve resources to ensure the rescaling doesn't take longer as
> expected.
> > This is not only a problem when scaling up but also when scaling down
> > because any pods surrendered might be taken over by another deployment
> > during the rescaling. This would certainly be a case for integrating
> > autoscaling with the Flink scheduler, e.g. via FLIP-250 or via the
> > rescaling API. Alternatively, the operator would have to reserve the
> > resources somehow.
> >
> > >Does auto-scaling have a plan to support jobs which are running in a
> > session cluster? It might be a different
> >
> > We are targeting the application deployment mode for the first version
> but
> > the standalone mode can be supported as soon as we have an integration
> with
> > the scheduler.
> >
> > > # Horizontal scaling V.S. Vertical scaling
> >
> > True. We left out vertical scaling intentionally. For now we assume CPU /
> > memory is set up by the user. While definitely useful, vertical scaling
> > adds another dimension to the scaling problem which we wanted to tackle
> > later. I'll update the FLIP to explicitly state that.
> >
> > -Max
> >
> >
> >
> > On Tue, Nov 8, 2022 at 3:59 AM Yang Wang  wrote:
> >
> > > Thanks for the fruitful discussion and I am really excited to see that
> > the
> > > auto-scaling really happens for
> > >
> > > Flink Kubernetes operator. It will be a very important step to make the
> > > long-running Flink job more smoothly.
> > >
> > > I just have some immature ideas and want to share them here.
> > >
> > >
> > > # Resource Reservation
> > >
> > > Since the current auto-scaling needs to fully redeploy the application,
> > it
> > > may fail to start due to lack of resources.
> > >
> > > I know the Kubernetes operator could rollback to the old spec, but we
> > still
> > > waste a lot of time to make things worse.
> > >
> > > I hope the FLIP-250[1](support customized K8s scheduler) could help in
> > this
> > > case.
> > >
> > >
> > > # Session cluster
> > >
> > > Does auto-scaling have a plan to support jobs which are running in a
> > > session cluster? It might be a different
> > >
> > > story since we could not use Flink config options to override the  job
> > > vertex parallelisms. Given that the SessionJob
> > >
> > > is also a first-class citizen, we need to document the limitation if
> not
> > > support.
> > >
> > >
> > > # Horizontal scaling V.S. Vertical scaling
> > >
> > > IIUC, the current proposal does not mention vertical scaling. There
> might
> > > be a chance that the memory/cpu of
> > >
> > > TaskManager is not configured properly. And this will cause unnecessary
> > > multiple scaling executions.
> > >
> > >
> > >
> > >
> > > [1].
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> > >
> > >
> > >
> > > Best,
> > >
> > > Yang
> > >
> > > Maximilian Michels  于2022年11月8日周二 00:31写道:
> > >
> > > > Thanks for all the interest here and for the great remarks! Gyula
> > > > already did a great job addressing the questions here. Let me try to
> > > > add additional context:
> > > >
> > > > @Biao Geng:
> > > >
> > > > >1.  For source parallelisms, if the user configure a much larger
> value
> > > > than normal, there should be very little pending records though it is
> > > > possible to get optimized. But IIUC, in current algorithm, we will
> not
> > > take
> > > > actions for this case as the backlog growth rate is almost zero. Is
> the
> > > > understanding right?
> > > >
> > > > This is actually a corner case which we haven't exactly described in
> > > > the FLIP yet. Sources are assumed to only be scaled according to the
> > > > backlog but if there is zero backlog, we don't have a number to
> > > > compute the 

[jira] [Created] (FLINK-30030) Unexpected behavior for overwrite in Hive dialect

2022-11-15 Thread luoyuxia (Jira)
luoyuxia created FLINK-30030:


 Summary: Unexpected behavior for overwrite in Hive dialect
 Key: FLINK-30030
 URL: https://issues.apache.org/jira/browse/FLINK-30030
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.15.0
Reporter: luoyuxia


When overwrite a table in hive dialect, it might not overwrite but to append.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Understanding the Internals of Watermark Generation | Actual Implementation

2022-11-15 Thread Tawfek Yasser Tawfek
Dear Flink Developers,

Hope all is well,

I'm trying to understand how flink in runtime generates and emits watermarks,
I think that the TimestampsAndWatermarksOperator class is responsible for 
assigning timestamps and generating watermarks actually,
based on the specified watermark strategy in the DataStream API.

If this is correct, please how could I understand how this operator works, else 
can you please guide me on where should I work to understand the watermark 
generation process?

Thank you so much.


Best Regards.


[https://lh6.googleusercontent.com/gwaS1SR2iejRdgnwu7EZRmqp6XnYEdkf8nLoDs1YqEyXL9tgDMhRXCUfIidQwgSw5RpwcIl1i9r-A2ks6tQTTxZVens5XUI9lULU21NL1pZqJ19BcHtIdV3ACHtNZnVzgNNn4prfiaohFmfDmvlZy1fXwfHhycJDietmyqjJ3JbmQ6GCGw2xEYifvg]

  
[https://lh4.googleusercontent.com/-Alfe_t6TtKrNqUPaVDM0mPuwlKVUeZmK5n-jTDJKo70dgD8OXsNSmPv3PbkEMH-1iHoxiIuz_czeq1CkpSPdxmFbKZkRCt5YZktDRrfvUS5s9QnfqByTfVPZc8NqMqeV3sbdnyTIhuBqRPRWxOnCh9ZYncxZWSOXFLy04JXhT8jfaoVrRO4AUk3Yg]
 

 Tawfik Yasser

 Teaching Assistant, Nile University | M.Sc. INF

 Artificial Intelligence Program, ITCS

 26th of July Corridor, Sheikh Zayed City, Giza, Egypt

 www.nu.edu.eg



Env Vars in the Flink Web UI

2022-11-15 Thread Konstantin Knauf
Hi all,

since Flink 1.17 [1] the Flink Web UI prints *all* environment variables of
the Taskmanager or Jobmanagers hosts (Jobmanager -> Configuration ->
Environment). Given that environment variables are often used to store
sensitive information, I think, it is wrong and dangerous to print those in
the Flink Web UI. Specifically, thinking about how Kubernetes Secrets are
usually injected into Pods.

One could argue that anyone who can submit a Flink Job to a cluster has
access to these environment variables anyway, but not everyone who has
access to the Flink UI can submit a Flink Job.

I see the the following options:
a) leave as is
b) apply same obfuscation as in flink-conf.yaml based on some heuristic (no
"secret", "password" in env var name)
c) only print allow-listed values
d) don't print any env vars in the web UI (at least by default)

What do you think?

Cheers,

Konstantin

[1] https://issues.apache.org/jira/browse/FLINK-28311

-- 
https://twitter.com/snntrable
https://github.com/knaufk


[jira] [Created] (FLINK-30029) Notice/ShadeParser parse version/classifier separately

2022-11-15 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-30029:


 Summary: Notice/ShadeParser parse version/classifier separately
 Key: FLINK-30029
 URL: https://issues.apache.org/jira/browse/FLINK-30029
 Project: Flink
  Issue Type: Sub-task
  Components: Build System / CI
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[SUMMARY] Flink 1.17 Release Sync 11/15/2022

2022-11-15 Thread Leonard Xu
Hi devs and users,

I’d like to share some highlights about the 1.17 release sync on 11/15/2022.

- Release tracking page:
 - The community has collected some great features on the 1.17 page[1]
 - @committers Please continuously update the page in the coming week
 
- JIRA account apply :
  - Martijn updated the issue tracking flow[2][3]
  - Users without JIRA account can follow this doc[2][3] to apply JIRA 
account as well join as Flink contributor

- Blockers:
- Blocker FLINK-29387 has been fixed
- PR for blocker FLINK-29315 is opened and waiting for review.
- Blocker FLINK-29818 is reopened, Yang Wang is looking into this ticket

- Build stability: Number of growing test stability issues with “Exit code 137 
errors”
- Matthias and Qingsheng investigated the memory issue due to multiple 
azure agents on one machine use too much resources
- We’ve reduced the agents number from 7 to 5, let’s keep an eyes on this 
issue.
- Leonard offered a workaround to skip slack clickable issue in slack 
#builds channel

The next release sync will be on November 29th, 2022.

Google Meet: https://meet.google.com/wcx-fjbt-hhz
Dial-in: https://tel.meet/wcx-fjbt-hhz?pin=1940846765126  

Best regards,
Martijn, Qingsheng, Matthias and Leonard 

[1] https://cwiki.apache.org/confluence/display/FLINK/1.17+Release
[2] https://flink.apache.org/community.html
[3] https://flink.apache.org/zh/community.html



Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-15 Thread Dong Lin
Thank you Yun for the detailed explanation!

Since this FLIP can add quite some complexity to Flink, it will be really
useful to understand the concrete case-case for the proposed changes so
that we can identify the approach with minimum extra complexity. We can
probably continue the discussion after the FLIP is updated with
the use-cases.

Please see my comments inline.

On Tue, Nov 15, 2022 at 4:18 PM Yun Gao 
wrote:

> Hi Dong,
> Very thanks for the discussion!
> > It appears that the issues mentioned in the motivation section
> > are all related to using Windows on the DataStream API, where
> > the user's code typically does not have anything to do with Timer.
> IMO it might not only bounded to the case of window operators. For
> examples, users might implements some complex aggregation logic
> with ProcessFunction directly. In this case, users might also want to
> control how these times are dealt at the end of the stream.




> > IMO, the main benefit of this alternative solution is that it is more
> > consistent with the existing Windows API. Users who are concerned
> > with firing windows on end-of-stream won't need to additionally
> > understand/handle timer.
> First to summary the problem, currently it could be divided into two
> layers:
> 1. In the lower layer we need to support different actions to deal with
> the timers at the end of the stream (though in fact we need to deduct
> whether we need this ability from the api, but for simplicity I'll first
> describe
> this layer since the divergency happen in the higher level).
> 2. How we let users to specify the actions at the end of the timer?
> Currently
> we have different options on this layer.
>  - The first option is to have a unified SingleOperatorStream#
>  setTimerTerminationAction.
>  - The second option is to have a specialized trigger for the window.
> With whichever interface, in the window operator need to set proper
> termination actions according to the specified semantics when registering
> timers.
> On the other side, specially to the WindowOperator, the interface might
> not
> only related to the timers, there are also window types, e.g. CountWindow,
>  that also need to specify the behavior at the end of stream.
> Therefore, for window operators it looks to me it would be indeed more
> friendly
> to users to have a uniform API. Since different operators might have
> different
> situations, I wonder it would be better if we first:
> 1. The operator authors could still set the default actions when
> registering timers.
> 2. Each operator consider its API distinctly.
>  - Window operator provides a uniform API.
>  - Except for window, Currently it looks to me that users could register
> customized
>  timers only with the family of ProcessFunctions. Users could still set
> actions for
>  each timer, and we may first only provide a method for ProcessOperator to
> change
>  the per-timer actions uniformly when building the DAG?
> > we need the task thread to be blocked until the timer gets triggered on
> the registered time
> > point.
> Currently I do not have real-life scenarios, but some authenticated cases
> are
> - Users want the job stopped at the boundary of windows when stopping the
> job with savepoint --drain.
>

Hmm... I guess you mean the processing time window in this scenario. It is
not clear to me why users would want to block waiting for wallclock time to
pass instead of stopping the job immediately..

- Users have timers to emit message to external systems periodically, and
> users want to have one finalize
> message at the end of stream.


IMO, an alternative solution for this use-case is to allow users to specify
what to do at the end of the input, rather than specifying what to do with
timers at the end of time.


> But I also think we could add more actions step-by-step.
> > I might have missed use-cases for this FLIP which do not involve
> windows.
> > If so, could you help explain the use-case in this FLIP?
> I'll complete the scenarios in the FLIP.
>

Great! I am looking forward to understanding more about the use-cases.


> Best,
> Yun Gao
> --
> From:Dong Lin 
> Send Time:2022 Nov. 10 (Thu.) 11:43
> To:dev 
> Cc:Maximilian Michels 
> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
> Timers on Job Termination
> Hi Piotr,
> I also think the scenario mentioned in this FLIP is useful to address. I am
> happy to discuss this together and figure out the more usable APIs.
> I guess the choice of API pretty much depends on when users need to use it.
> I am assuming it is needed when using dataStream.window(...). Is there any
> other case that needs this feature?
> It is mentioned in FLINK-18647
>  <
> https://issues.apache.org/jira/browse/FLINK-18647> > that we need the task
> thread to be blocked until the timer gets triggered on the registered time
> point. The JIRA and the FLIP 

Re: [DISCUSS] Allow sharing (RocksDB) memory between slots

2022-11-15 Thread Khachatryan Roman
Thanks for your reply Xingong Song,

Could you please elaborate on the following:

> The proposed changes break several good properties that we designed for
> managed memory.
> 1. It's isolated across slots
Just to clarify, any way to manage the memory efficiently while capping its
usage
will break the isolation. It's just a matter of whether it's managed memory
or not.
Do you see any reasons why unmanaged memory can be shared, and managed
memory can not?

> 2. It should never be wasted (unless there's nothing in the job that needs
> managed memory)
If I understand correctly, the managed memory can already be wasted because
it is divided evenly between slots, regardless of the existence of its
consumers in a particular slot.
And in general, even if every slot has RocksDB / python, it's not
guaranteed equal consumption.
So this property would rather be fixed in the current proposal.

> In addition, it further complicates configuration / computation logics of
> managed memory.
I think having multiple options overriding each other increases the
complexity for the user. As for the computation, I think it's desirable to
let Flink do it rather than users.

Both approaches need some help from TM for:
- storing the shared resources (static field in a class might be too
dangerous because if the backend is loaded by the user-class-loader then
memory will leak silently).
- reading the configuration

Regards,
Roman


On Sun, Nov 13, 2022 at 11:24 AM Xintong Song  wrote:

> I like the idea of sharing RocksDB memory across slots. However, I'm quite
> concerned by the current proposed approach.
>
> The proposed changes break several good properties that we designed for
> managed memory.
> 1. It's isolated across slots
> 2. It should never be wasted (unless there's nothing in the job that needs
> managed memory)
> In addition, it further complicates configuration / computation logics of
> managed memory.
>
> As an alternative, I'd suggest introducing a variant of
> RocksDBStateBackend, that shares memory across slots and does not use
> managed memory. This basically means the shared memory is not considered as
> part of managed memory. For users of this new feature, they would need to
> configure how much memory the variant state backend should use, and
> probably also a larger framework-off-heap / jvm-overhead memory. The latter
> might require a bit extra user effort compared to the current approach, but
> would avoid significant complexity in the managed memory configuration and
> calculation logics which affects broader users.
>
>
> Best,
>
> Xintong
>
>
>
> On Sat, Nov 12, 2022 at 1:21 AM Roman Khachatryan 
> wrote:
>
> > Hi John, Yun,
> >
> > Thank you for your feedback
> >
> > @John
> >
> > > It seems like operators would either choose isolation for the cluster’s
> > jobs
> > > or they would want to share the memory between jobs.
> > > I’m not sure I see the motivation to reserve only part of the memory
> for
> > sharing
> > > and allowing jobs to choose whether they will share or be isolated.
> >
> > I see two related questions here:
> >
> > 1) Whether to allow mixed workloads within the same cluster.
> > I agree that most likely all the jobs will have the same "sharing"
> > requirement.
> > So we can drop "state.backend.memory.share-scope" from the proposal.
> >
> > 2) Whether to allow different memory consumers to use shared or exclusive
> > memory.
> > Currently, only RocksDB is proposed to use shared memory. For python,
> it's
> > non-trivial because it is job-specific.
> > So we have to partition managed memory into shared/exclusive and
> therefore
> > can NOT replace "taskmanager.memory.managed.shared-fraction" with some
> > boolean flag.
> >
> > I think your question was about (1), just wanted to clarify why the
> > shared-fraction is needed.
> >
> > @Yun
> >
> > > I am just curious whether this could really bring benefits to our users
> > with such complex configuration logic.
> > I agree, and configuration complexity seems a common concern.
> > I hope that removing "state.backend.memory.share-scope" (as proposed
> above)
> > reduces the complexity.
> > Please share any ideas of how to simplify it further.
> >
> > > Could you share some real experimental results?
> > I did an experiment to verify that the approach is feasible,
> > i.e. multilple jobs can share the same memory/block cache.
> > But I guess that's not what you mean here? Do you have any experiments in
> > mind?
> >
> > > BTW, as talked before, I am not sure whether different lifecycles of
> > RocksDB state-backends
> > > would affect the memory usage of block cache & write buffer manager in
> > RocksDB.
> > > Currently, all instances would start and destroy nearly simultaneously,
> > > this would change after we introduce this feature with jobs running at
> > different scheduler times.
> > IIUC, the concern is that closing a RocksDB instance might close the
> > BlockCache.
> > I checked that manually and it seems to work as expected.
> > And 

Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-15 Thread Etienne Chauchot

Hi,

Any feedback on the interest of the API benchmark article below ?

Best

Etienne

Le 09/11/2022 à 12:18, Etienne Chauchot a écrit :


Hi,

And by the way, I was planing on writing another article to compare 
the performances of DataSet, DataStream and SQL APIs over TPCDS 
query3. I thought that I could run the pipelines on an Amazon EMR 
cluster with different data sizes 1GB, 100GB, 1TB.


Would it be worth it, what do you think ?

Best

Etienne

Le 09/11/2022 à 10:04, Etienne Chauchot a écrit :


Hi Yun Gao,

thanks for your email and your review !

My comments are inline

Le 08/11/2022 à 06:51, Yun Gao a écrit :

Hi Etienne,

Very thanks for the article! Flink is currently indeed keeping 
increasing the
ability of unified batch / stream processing with the same api, and 
its a great
pleasure that more and more users are trying this functionality. But 
I also

have some questions regarding some details.

First IMO, as a whole for the long run Flink will have two unified 
APIs, namely Table / SQL
API and DataStream API. Users could express the computation logic 
with these two APIs

for both bounded and unbounded data processing.



Yes that is what I understood also throughout the discussions and 
jiras. And I also think IMHO that reducing the number of APIs to 2 
was the good move.




Underlying Flink provides two
execution modes:  the streaming mode works with both bounded and 
unbounded data,
and it executes in a way of incremental processing based on state; 
the batch mode works
only with bounded data, and it executes in a ways level-by-level 
similar to the traditional

batch processing frameworks. Users could switch the execution mode via
EnvironmentSettings.inBatchMode() for 
StreamExecutionEnvironment.setRuntimeMode().


As recommended in Flink docs(1) I have enabled the batch mode as I 
though it would be more efficient on my bounded pipeline but as a 
matter of fact the streaming mode seems to be more efficient on my 
use case. I'll test with higher volumes to confirm.





Specially for DataStream, as implemented in FLIP-140, currently all 
the existing DataStream
operation supports the batch execution mode in a unified way[1]:  
data will be sorted for the
keyBy() edges according to the key, then the following operations 
like reduce() could receive
all the data belonging to the same key consecutively, then it could 
directly reducing the records
of the same key without maintaining the intermediate states. In this 
way users could write the

same code for both streaming and batch processing with the same code.



Yes I have no doubt that my resulting Query3ViaFlinkRowDatastream 
pipeline will work with no modification if I plug an unbounded source 
to it.





# Regarding the migration of Join / Reduce

First I think Reduce is always supported and users could write 
dataStream.keyBy().reduce(xx)
directly, and  if batch execution mode is set, the reduce will not 
be executed in a incremental way,
instead is acts much  like sort-based  aggregation in the 
traditional batch processing framework.


Regarding Join, although the issue of FLINK-22587 indeed exists: 
current join has to be bound
to a window and the GlobalWindow does not work properly, but with 
some more try currently
it does not need users to re-write the whole join from scratch: 
Users could write a dedicated
window assigner that assigns all the  records to the same window 
instance  and return
EventTimeTrigger.create() as the default event-time trigger [2]. 
Then it works


source1.join(source2)
                .where(a -> a.f0)
                .equalTo(b -> b.f0)
                .window(new EndOfStreamWindows())
                .apply();

It does not requires records have event-time attached since the 
trigger of window is only
relying on the time range of the window and the assignment does not 
need event-time either.


The behavior of the join is also similar to sort-based join if batch 
mode is enabled.


Of course it is not easy to use to let users do the workaround and 
we'll try to fix this issue in 1.17.



Yes, this is a better workaround than the manual state-based join 
that I proposed. I tried it and it works perfectly with similar 
performance. Thanks.




# Regarding support of Sort / Limit

Currently these two operators are indeed not supported in the 
DataStream API directly. One initial
though for these two operations are that users may convert the 
DataStream to Table API and use

Table API for these two operators:

DataStream xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());



Yes I knew that workaround but I decided not to use it because I have 
a special SQL based implementation (for comparison reasons) so I did 
not want to mix SQL and DataStream APIs in the same pipeline.




How do you think about this option? We are also assessing if the 
combination of DataStream
API / Table API is sufficient for all the 

Re: [DISCUSS] FLIP-268: Rack Awareness for Kafka Sources

2022-11-15 Thread Dong Lin
Hi Jeremy,

Thanks for creating the FLIP! I think this is a pretty useful feature for
users to save operation costs. It would be great to have this feature in
Flink.

I have a couple of questions below:

1) Could you update the FLIP to specify the concrete signature of the
setRackId(..) method proposed in this FLIP? For example, it will be useful
to know the signature of the callback expected by this method.

2) Could you explain how are users expected to implement this callback to
provide the right rackId at runtime? For example, it will be useful to know
how to get the right rack id of the subtask/TM at runtime.

3) Any chance we can provide a default callback implementation that users
can use out-of-the-box, instead of asking users to write a non-trivial
callback?

4) The Java API proposed in this FLIP allows DataStream users to enjoy this
feature. It could be great to also allow Flink SQL users to use this
feature. We can probably add an option to the Kafka connector

so that users can specify it in SQL. This can be done in a separate FLIP if
needed.

BTW, the Kafka config used by the proposed FLIP, i.e. client.rack, is added
in KIP-268
.
It might be useful to also reference this KIP in addition to KIP-36.


Cheers,
Dong


On Fri, Nov 11, 2022 at 5:36 AM Jeremy DeGroot 
wrote:

> Kafak has a rack awareness feature that allows brokers and consumers to
> communicate about the rack (or AWS Availability Zone) they're located in.
> Reading from a local broker can save money in bandwidth and improve latency
> for your consumers.
>
> This improvement proposes that a Kafka Consumer could be configured with a
> callback that could be run when it's being configured on the task manager,
> that will set the appropriate value at runtime if a value is provided.
>
> More detail about this proposal can be found at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-268%3A+Kafka+Rack+Awareness
>
>
> More information about the Kafka rack awareness feature is at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams
>
>
> Best,
>
> Jeremy
>


Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-15 Thread Dawid Wysakowicz

Hey all,

Sorry for being rather late to the party. I'd like to chime in a few 
thoughts.


I agree there are at least two aspects to the problem:

1) lower level handling (StreamOperator)

2) higher level APIs (WindowOperator, CEP, ProcessFunction, ...)

First is how do we want to deal with it on a lower level. By lower level 
I mean AbstractStreamOperator and lower. To be honest I am against 
adding a property to every registered timer, as it is in the current 
proposal. It introduces many problems for, in my opinion, really rare 
case. In the end it applies only to jobs that finish and have registered 
processing timers.


Some problems that I see:

 * Changes the checkpoint format and stores most of the time
   unnecessary information.
 * We'd need to somehow inform the ProcessingTimeCallback (and alike)
   that a timer is triggered at the end of job. This is necessary at
   least, so that implementors know they should not register any new
   timers.
 * Introduces a method overload. I guess we would want to deprecate the
   method without TimerEndAction at some point.

I'd rather go with a variation of the other proposal from FLINK-18647 
and add a hook for what should happen at the end of input. I look at the 
problem rather from the perspective of endOfInput, not individual 
timers. This could be something like:


|public| |interface| |ProcessingTimeService {|
|  void registerOnEndOfInputHandler(Consumer onEndOfInput)|
|
|
|  // we could even think of having a single callback for all timers at once
|
|  void registerOnEndOfInputHandler(Consumer> onEndOfInput)|
|}
|


As for the higher level API, I'm not convinced to having a single

|@Public|
|public| |class| |SingleOutputStreamOperator ||extends| |DataStream {|
|||…|
|||void| |setTimerTerminationAction(TimerTerminationAction action);|
|||…|
|}|

All of the discussed APIs have/are specific DSLs and introducing a 
disjoint approach does not seem natural to me. Users do not/should not 
think about timers when they define a Window or CEP pattern. They deal 
with triggers/windows or timeout conditions. Moreover I strongly believe 
in each of these cases there is only a single valid behaviour. I can't 
really imagine why would anyone want to discard windows at the end.
There is the argument of the "waiting the exact time", but this is of 
questionable use to me and I'd prefer to see a real request for that. So 
far, the way I understood, it's more of "we think somebody may come up 
with such request".
My suggestion would be to simply change the behaviour for WindowOperator 
and CEPOperator without adding any additional API. I understand it 
changes the semantics a bit, but a) I see it as defining so far 
undefined behaviour b) again, can't imagine someone depends on the 
broken behaviour where contents of last windows is dropped. We could of 
course add a way to change the behaviour back if it annoys someone. On 
this topic I'm really open for discussion.
This would leave only the ProcessFunction to consider, as there one can 
implement any custom handling. In my opinion this can be approach 
similarly as the lower APIs, by introducing a hook for timers at the 
endOfInput, e.g.


public abstract class KeyedProcessFunction extends 
AbstractRichFunction {

  public void onEndOf(Timers)/(Input)(...) {}

 // or maybe
 public void onUntriggeredTimer(long timer) {}
}

In either of the above methods there would be no Context, so no way to 
register new timers.

Let me know what you think.
Best,
Dawid
On 15/11/2022 09:17, Yun Gao wrote:

Hi Dong,
Very thanks for the discussion!

It appears that the issues mentioned in the motivation section
are all related to using Windows on the DataStream API, where
the user's code typically does not have anything to do with Timer.

IMO it might not only bounded to the case of window operators. For
examples, users might implements some complex aggregation logic
with ProcessFunction directly. In this case, users might also want to
control how these times are dealt at the end of the stream.

IMO, the main benefit of this alternative solution is that it is more
consistent with the existing Windows API. Users who are concerned
with firing windows on end-of-stream won't need to additionally
understand/handle timer.

First to summary the problem, currently it could be divided into two
layers:
1. In the lower layer we need to support different actions to deal with
the timers at the end of the stream (though in fact we need to deduct
whether we need this ability from the api, but for simplicity I'll first 
describe
this layer since the divergency happen in the higher level).
2. How we let users to specify the actions at the end of the timer? Currently
we have different options on this layer.
  - The first option is to have a unified SingleOperatorStream#
  setTimerTerminationAction.
  - The second option is to have a specialized trigger for the window.
With whichever interface, in the window operator need to set proper

[jira] [Created] (FLINK-30028) A little fix with document about period in log.changelog-mode description.

2022-11-15 Thread Hang HOU (Jira)
Hang HOU created FLINK-30028:


 Summary: A little fix with document about period in 
log.changelog-mode description. 
 Key: FLINK-30028
 URL: https://issues.apache.org/jira/browse/FLINK-30028
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: 1.16.0
 Environment: Flink 0.16.0
Reporter: Hang HOU


One more period in the end of ”log.changelog-mode“description.
[log.changelog-mode|https://nightlies.apache.org/flink/flink-table-store-docs-release-0.2/docs/development/configuration/#coreoptions]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on Job Termination

2022-11-15 Thread Yun Gao
Hi Dong,
Very thanks for the discussion!
> It appears that the issues mentioned in the motivation section 
> are all related to using Windows on the DataStream API, where 
> the user's code typically does not have anything to do with Timer.
IMO it might not only bounded to the case of window operators. For
examples, users might implements some complex aggregation logic
with ProcessFunction directly. In this case, users might also want to 
control how these times are dealt at the end of the stream. 
> IMO, the main benefit of this alternative solution is that it is more
> consistent with the existing Windows API. Users who are concerned
> with firing windows on end-of-stream won't need to additionally 
> understand/handle timer.
First to summary the problem, currently it could be divided into two
layers: 
1. In the lower layer we need to support different actions to deal with 
the timers at the end of the stream (though in fact we need to deduct
whether we need this ability from the api, but for simplicity I'll first 
describe
this layer since the divergency happen in the higher level).
2. How we let users to specify the actions at the end of the timer? Currently 
we have different options on this layer. 
 - The first option is to have a unified SingleOperatorStream#
 setTimerTerminationAction. 
 - The second option is to have a specialized trigger for the window. 
With whichever interface, in the window operator need to set proper 
termination actions according to the specified semantics when registering 
timers. 
On the other side, specially to the WindowOperator, the interface might not 
only related to the timers, there are also window types, e.g. CountWindow,
 that also need to specify the behavior at the end of stream.
Therefore, for window operators it looks to me it would be indeed more friendly 
to users to have a uniform API. Since different operators might have different 
situations, I wonder it would be better if we first:
1. The operator authors could still set the default actions when registering 
timers.
2. Each operator consider its API distinctly. 
 - Window operator provides a uniform API.
 - Except for window, Currently it looks to me that users could register 
customized 
 timers only with the family of ProcessFunctions. Users could still set actions 
for 
 each timer, and we may first only provide a method for ProcessOperator to 
change 
 the per-timer actions uniformly when building the DAG? 
> we need the task thread to be blocked until the timer gets triggered on the 
> registered time
> point. 
Currently I do not have real-life scenarios, but some authenticated cases are
- Users want the job stopped at the boundary of windows when stopping the job 
with savepoint --drain.
- Users have timers to emit message to external systems periodically, and users 
want to have one finalize
message at the end of stream. 
But I also think we could add more actions step-by-step. 
> I might have missed use-cases for this FLIP which do not involve windows. 
> If so, could you help explain the use-case in this FLIP?
I'll complete the scenarios in the FLIP.
Best,
Yun Gao
--
From:Dong Lin 
Send Time:2022 Nov. 10 (Thu.) 11:43
To:dev 
Cc:Maximilian Michels 
Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers on 
Job Termination
Hi Piotr,
I also think the scenario mentioned in this FLIP is useful to address. I am
happy to discuss this together and figure out the more usable APIs.
I guess the choice of API pretty much depends on when users need to use it.
I am assuming it is needed when using dataStream.window(...). Is there any
other case that needs this feature?
It is mentioned in FLINK-18647
 
 > that we need the task
thread to be blocked until the timer gets triggered on the registered time
point. The JIRA and the FLIP do not seem to provide the use-case for this
feature. Could you explain more about the use-cases that might need this
feature?
What do you think of the alternative API vs. the approach proposed in the
FLIP? Maybe we can continue the discussion by detailing the pros/cons.
Best,
Dong
On Wed, Nov 9, 2022 at 4:35 PM Piotr Nowojski  wrote:
> Hi all,
>
> Big thanks to Yun Gao for driving this!
>
> > I wonder whether we need to add a new option when registering timers.
> Won't
> > it be sufficient to flush all pending timers on termination but not allow
> > new ones to be registered?
>
> Maximilian, I'm sure that single semantics is not enough. All three that
> are proposed here (cancel, wait, trigger immediately) were requested by
> users.
>
> Dong, as I initially wrote in the above-mentioned ticket [1] I'm personally
> open to discussions about the final shape of the API.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18647 
> 
>