Re: [ANNOUNCE] Documentation now available at nightlies.apache.org

2021-09-09 Thread Chesnay Schepler

To me its is not any less official sounding than "ci.apache.org".

flink.apache.org content is served directly from Git; we would have to 
commit the compiled content to the flink-web repository.
This would require changes to our current workflow and would likely 
reduce the power of our current approach in the process, i.e., making 
some change in flink no longer automatically deploys the docs, you'd 
have to add them manually to flink-web, and then we're pretty much only 
gonna do it for releases.


In short, that's not something we can change quickly.

On 10/09/2021 06:41, Jark Wu wrote:

Thanks Chesnay for the migration work,

However, I think the domain name "nightlies.apache.org" does not sound like
an official address, and the current documentation URL is a bit long
https://nightlies.apache.org/flink/flink-docs-release-1.14/.

Is it possible to migrate to https://flink.apache.org/ ? e.g. with a short
link https://flink.apache.org/docs/1.14/ which sounds more official and
easier to remember. Maybe it's also a good chance to announce a new domain.

Best,
Jark



On Fri, 10 Sept 2021 at 11:23, Leonard Xu  wrote:


Thanks Chesnay for the migration work.

Should we add a redirection for the old documentation site:
https://ci.apache.org/flink/flink-docs-master/  to make
it redirect to the new one:
https://nightlies.apache.org/flink/flink-docs-master/ ?

The bookmark in users’ browser should still be the old one, I googled
"flink documents" which also returned the old one.
And the old one won’t be updated and would be outdated soon.

Best,
Leonard


在 2021年9月6日,17:11,Chesnay Schepler  写道:

Website has been updated to point to nightlies.apache.org as well.

On 03/09/2021 08:03, Chesnay Schepler wrote:

The migration is pretty much complete and the documentation is now

available at nightlies.apache.org .

Please click around a bit and check if anything is broken.

If no issues are reported by the end of today I will update the links

on the website.

On 01/09/2021 10:11, Chesnay Schepler wrote:

We are in the final steps of migrating the documentation to the new

buildbot setup.

Because of that the documentation currently available at ci.apache.org

will NOT be updated until further notice because the old builders have been
deactivated while we iron out kinks in the new ones.

I will keep you updated on the progress.







Re: [ANNOUNCE] Documentation now available at nightlies.apache.org

2021-09-09 Thread Chesnay Schepler

A redirection will be setup by infra at some point.

On 10/09/2021 05:23, Leonard Xu wrote:

Thanks Chesnay for the migration work.

Should we add a redirection for the old documentation site: 
https://ci.apache.org/flink/flink-docs-master/  to make
it redirect to the new one: 
https://nightlies.apache.org/flink/flink-docs-master/ ?

The bookmark in users’ browser should still be the old one, I googled "flink 
documents" which also returned the old one.
And the old one won’t be updated and would be outdated soon.
  
Best,

Leonard


在 2021年9月6日,17:11,Chesnay Schepler  写道:

Website has been updated to point to nightlies.apache.org as well.

On 03/09/2021 08:03, Chesnay Schepler wrote:

The migration is pretty much complete and the documentation is now available at 
nightlies.apache.org .

Please click around a bit and check if anything is broken.

If no issues are reported by the end of today I will update the links on the 
website.

On 01/09/2021 10:11, Chesnay Schepler wrote:

We are in the final steps of migrating the documentation to the new buildbot 
setup.

Because of that the documentation currently available at ci.apache.org will NOT 
be updated until further notice because the old builders have been deactivated 
while we iron out kinks in the new ones.

I will keep you updated on the progress.





[jira] [Created] (FLINK-24239) Event time temporal join should support values from array, map, row, etc. as join key

2021-09-09 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-24239:
---

 Summary: Event time temporal join should support values from 
array, map, row, etc. as join key
 Key: FLINK-24239
 URL: https://issues.apache.org/jira/browse/FLINK-24239
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.12.6, 1.13.3, 1.15.0, 1.14.1
Reporter: Caizhi Weng


This ticket is from the [mailing 
list|https://lists.apache.org/thread.html/r90cab9c5026e527357d58db70d7e9b5875e57b942738f032bd54bfd3%40%3Cuser-zh.flink.apache.org%3E].

Currently in event time temporal join when join keys are from an array, map or 
row, an exception will be thrown saying "Currently the join key in Temporal 
Table Join can not be empty". This is quite confusing for users as they've 
already set the join keys.

Add the following test case to {{TableEnvironmentITCase}} to reproduce this 
issue.
{code:scala}
@Test
def myTest(): Unit = {
  tEnv.executeSql(
"""
  |CREATE TABLE A (
  |  a MAP,
  |  ts TIMESTAMP(3),
  |  WATERMARK FOR ts AS ts
  |) WITH (
  |  'connector' = 'values'
  |)
  |""".stripMargin)
  tEnv.executeSql(
"""
  |CREATE TABLE B (
  |  id INT,
  |  ts TIMESTAMP(3),
  |  WATERMARK FOR ts AS ts
  |) WITH (
  |  'connector' = 'values'
  |)
  |""".stripMargin)
  tEnv.executeSql("SELECT * FROM A LEFT JOIN B FOR SYSTEM_TIME AS OF A.ts AS b 
ON A.a['ID'] = id").print()
}
{code}
The exception stack is
{code:java}
org.apache.flink.table.api.ValidationException: Currently the join key in 
Temporal Table Join can not be empty.

at 
org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromGeneralTemporalTableRule.onMatch(LogicalCorrelateToJoinFromTemporalTableRule.scala:272)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 

Re: [ANNOUNCE] Documentation now available at nightlies.apache.org

2021-09-09 Thread Jark Wu
Thanks Chesnay for the migration work,

However, I think the domain name "nightlies.apache.org" does not sound like
an official address, and the current documentation URL is a bit long
https://nightlies.apache.org/flink/flink-docs-release-1.14/.

Is it possible to migrate to https://flink.apache.org/ ? e.g. with a short
link https://flink.apache.org/docs/1.14/ which sounds more official and
easier to remember. Maybe it's also a good chance to announce a new domain.

Best,
Jark



On Fri, 10 Sept 2021 at 11:23, Leonard Xu  wrote:

> Thanks Chesnay for the migration work.
>
> Should we add a redirection for the old documentation site:
> https://ci.apache.org/flink/flink-docs-master/  to make
> it redirect to the new one:
> https://nightlies.apache.org/flink/flink-docs-master/ ?
>
> The bookmark in users’ browser should still be the old one, I googled
> "flink documents" which also returned the old one.
> And the old one won’t be updated and would be outdated soon.
>
> Best,
> Leonard
>
> > 在 2021年9月6日,17:11,Chesnay Schepler  写道:
> >
> > Website has been updated to point to nightlies.apache.org as well.
> >
> > On 03/09/2021 08:03, Chesnay Schepler wrote:
> >> The migration is pretty much complete and the documentation is now
> available at nightlies.apache.org .
> >>
> >> Please click around a bit and check if anything is broken.
> >>
> >> If no issues are reported by the end of today I will update the links
> on the website.
> >>
> >> On 01/09/2021 10:11, Chesnay Schepler wrote:
> >>> We are in the final steps of migrating the documentation to the new
> buildbot setup.
> >>>
> >>> Because of that the documentation currently available at ci.apache.org
> will NOT be updated until further notice because the old builders have been
> deactivated while we iron out kinks in the new ones.
> >>>
> >>> I will keep you updated on the progress.
> >>>
> >>
> >
>
>


Re: [ANNOUNCE] Documentation now available at nightlies.apache.org

2021-09-09 Thread Leonard Xu
Thanks Chesnay for the migration work.

Should we add a redirection for the old documentation site: 
https://ci.apache.org/flink/flink-docs-master/  to make
it redirect to the new one: 
https://nightlies.apache.org/flink/flink-docs-master/ ?

The bookmark in users’ browser should still be the old one, I googled "flink 
documents" which also returned the old one. 
And the old one won’t be updated and would be outdated soon.
 
Best,
Leonard

> 在 2021年9月6日,17:11,Chesnay Schepler  写道:
> 
> Website has been updated to point to nightlies.apache.org as well.
> 
> On 03/09/2021 08:03, Chesnay Schepler wrote:
>> The migration is pretty much complete and the documentation is now available 
>> at nightlies.apache.org .
>> 
>> Please click around a bit and check if anything is broken.
>> 
>> If no issues are reported by the end of today I will update the links on the 
>> website.
>> 
>> On 01/09/2021 10:11, Chesnay Schepler wrote:
>>> We are in the final steps of migrating the documentation to the new 
>>> buildbot setup.
>>> 
>>> Because of that the documentation currently available at ci.apache.org will 
>>> NOT be updated until further notice because the old builders have been 
>>> deactivated while we iron out kinks in the new ones.
>>> 
>>> I will keep you updated on the progress.
>>> 
>> 
> 



[VOTE] FLIP-173: Support DAG of algorithms

2021-09-09 Thread Dong Lin
Hi all,

We would like to start the vote for FLIP-173: Support DAG of
algorithms [1]. This FLIP was discussed in this thread [2].

The proposal extended the Flink ML API to support DAG of algorithms where
each algorithm could have multiple inputs and multiple outputs. It also
extended Flink ML API to support online learning scenarios where a
long-running Model instance needs to be continuously updated by the latest
model data generated by another long-running Estimator instance.

The vote will be open for at least 72 hours, following the consensus voting
process.

Thanks!
Dong Lin and Zhipeng Zhang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615783
[2]
https://lists.apache.org/thread.html/r6729f351fb1bc13a93754c199d5fee1051cc8146e22374737c578779%40%3Cdev.flink.apache.org%3E


[jira] [Created] (FLINK-24238) Page title missing

2021-09-09 Thread Jun Qin (Jira)
Jun Qin created FLINK-24238:
---

 Summary: Page title missing
 Key: FLINK-24238
 URL: https://issues.apache.org/jira/browse/FLINK-24238
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.13.2
Reporter: Jun Qin


the page title is missing on this Flink doc: 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/versioned_tables/].
  
 
[This 
one|https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/]
 is a good example.
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24237) Consider disabling or lowering DNS caching timeout in docker image

2021-09-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-24237:


 Summary: Consider disabling or lowering DNS caching timeout in 
docker image
 Key: FLINK-24237
 URL: https://issues.apache.org/jira/browse/FLINK-24237
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Chesnay Schepler
 Fix For: 1.15.0


A recent [blog post|https://mux.com/blog/5-years-of-flink-at-mux/] by Mux 
mentions that they have disabled DNS caching in their docker image because 
without it the Flink processes had trouble talking to over services when 
deployments move between nodes:

??The JVM will cache DNS entries forever by default. This is undesirable in 
Kubernetes deployments where there’s an expectation that DNS entries can and do 
change frequently as pod deployments move between nodes. We’ve seen Flink 
applications suddenly unable to talk to other services in the cluster after 
pods are upgraded.??

We should investigate this, in particular whether it would make sense to set it 
to a low value instead of disabling it entirely.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24236) Migrate tests to reporter factories

2021-09-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-24236:


 Summary: Migrate tests to reporter factories
 Key: FLINK-24236
 URL: https://issues.apache.org/jira/browse/FLINK-24236
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Metrics, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.15.0


In preparation for FLINK-24235, migrate tests to use factories where the 
instantiation path is not relevant (i.e., everything that is not the 
ReporterSetupTest).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24235) Restrict reporter support to plugins

2021-09-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-24235:


 Summary: Restrict reporter support to plugins
 Key: FLINK-24235
 URL: https://issues.apache.org/jira/browse/FLINK-24235
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Metrics
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.15.0


Metric reporters can currently be instantiated in one of 2 ways:
a) the reporter class is loaded via reflection
b) the reporter factory is loaded via reflection/ServiceLoader (aka, plugins)

All reporters provided by Flink use the factory approach, and it is preferable 
because it supports plugins. The plugin approach also has been available 1.11, 
and I think it's fair to remove the old approach by now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24234:
--

 Summary: [FLIP-171] Byte Based & Time Based Flushing for 
AsyncSinkBase
 Key: FLINK-24234
 URL: https://issues.apache.org/jira/browse/FLINK-24234
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24233) Receiving new buffer size before network reader configured

2021-09-09 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24233:
-

 Summary: Receiving new buffer size before network reader configured
 Key: FLINK-24233
 URL: https://issues.apache.org/jira/browse/FLINK-24233
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.14.1



{noformat}
2021-09-09 14:36:42,383 WARN  org.apache.flink.runtime.taskmanager.Task 
   [] - Map -> Flat Map (71/75)#0 (7a5b971e0cd57aa5d057a114e2679b03) 
switched from RUNNING to FAILED with failure c
ause: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Fatal error at remote task manager 
'ip-172-31-22-183.eu-central-1.compute.internal/172.31.22.183:42085'.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:339)
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:240)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No reader for receiverId = 
296559f497c54a82534945f4549b9e2d exists.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.obtainReader(PartitionRequestQueue.java:194)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.notifyNewBufferSize(PartitionRequestQueue.java:188)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:134)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at 

[jira] [Created] (FLINK-24232) Archiving of suspended jobs prevents breaks subsequent archive attempts

2021-09-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-24232:


 Summary: Archiving of suspended jobs prevents breaks subsequent 
archive attempts
 Key: FLINK-24232
 URL: https://issues.apache.org/jira/browse/FLINK-24232
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Chesnay Schepler


To archive a job we write a file that uses the job ID as the name. Since 
suspended jobs are handled like other terminal jobs they are also being 
archived.

When that job then later resumes any attempt to archive the job on termination 
will fail because an archive already exists.

The simplest option is to add a suffix if an archive already exists, like "_1".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24231) Buffer debloating microbenchmark for multiply gate

2021-09-09 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24231:
-

 Summary: Buffer debloating microbenchmark for multiply gate
 Key: FLINK-24231
 URL: https://issues.apache.org/jira/browse/FLINK-24231
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


It needs to expand the microbenchmark from 
https://issues.apache.org/jira/browse/FLINK-24230  with a scenario when 
different gates have:
* different throughput
* different record size



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24230) Buffer debloating microbenchmark for single gate

2021-09-09 Thread Anton Kalashnikov (Jira)
Anton Kalashnikov created FLINK-24230:
-

 Summary: Buffer debloating microbenchmark for single gate
 Key: FLINK-24230
 URL: https://issues.apache.org/jira/browse/FLINK-24230
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Anton Kalashnikov
 Fix For: 1.15.0


Currently, there are no microbenchmarks that check buffer debloating 
effectiveness. The idea is to create one which will measure the checkpoint 
time.  The benchmark should be similar to `UnalignedCheckpointTimeBenchmark` 
but unlike the `UnalignedCheckpointTimeBenchmark` where we see the effect of 
`Buffer debloat` only for extremely small values like 1ms for 
BUFFER_DEBLOAT_TARGET. This benchmark should provide a more reliable way to 
check the different implementations of `Buffer debloat` it can be reached by 
increasing at least record size and checkpoint interval. The main target is to 
have how long will it take to do the checkpoint during backpressure when all 
buffers are full.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24229:
--

 Summary: [FLIP-171] DynamoDB implementation of Async Sink
 Key: FLINK-24229
 URL: https://issues.apache.org/jira/browse/FLINK-24229
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
AsyncSinkBase class. The implementation can for now reside in its own module in 
flink-connectors. The module and package name can be anything reasonable e.g. 
{{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for Firehose by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24228:
--

 Summary: [FLIP-171] Firehose implementation of Async Sink
 Key: FLINK-24228
 URL: https://issues.apache.org/jira/browse/FLINK-24228
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24227:
--

 Summary: [FLIP-171] KDS implementation of Async Sink
 Key: FLINK-24227
 URL: https://issues.apache.org/jira/browse/FLINK-24227
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Automated architectural tests

2021-09-09 Thread Ingo Bürk
Great! I'll work on getting the PR into an actual, proper shape now,
including looking at found violations more carefully and eventually
freezing current violations (maybe removing some quick-wins).

One more thing I just ran into is that ArchUnit doesn't explicitly support
Scala; while many things just work (since it's still byte code),
Scala-specific concepts like traits seem to cause issues. I'll have to
exclude Scala code from the checks for now, I think.


Ingo

On Tue, Sep 7, 2021 at 5:03 PM Chesnay Schepler  wrote:

> I would say that's fine time-wise.
>
> On 07/09/2021 15:29, Ingo Bürk wrote:
> > Thanks, Chesnay. I updated the PR to use a separate module now, and ran
> it
> > on a few modules (some Table API modules and a couple connectors). The CI
> > seemed to take ~2.5min for executing the tests; that's certainly not
> > negligible. On the other hand, even the few tests implemented already
> found
> > several violations ("several" is an understatement, but I manually
> verified
> > some of them, not all of them).
> >
> > On Mon, Sep 6, 2021 at 3:44 PM Chesnay Schepler 
> wrote:
> >
> >> While flink-tests is currently the best choice in that it has the
> >> biggest classpath, it is also the module already requiring the most time
> >> on CI.
> >>
> >> Furthermore, given that we ideally cover all APIs (including connectors
> >> & formats), having that mess of dependencies in flink-tests may
> >> interfere with existing / future tests.
> >>
> >> As such I would prefer a separate module, as annoying as that may be.
> >>
> >> On 06/09/2021 15:26, Ingo Bürk wrote:
> >>> I just quickly chatted with the author/maintainer of ArchUnit, and a
> >> module
> >>> which depends on every module that should be tested seems to be the
> best
> >>> solution. How do you feel about using flink-tests for this vs. having a
> >>> separate module for this purpose?
> >>>
> >>>
> >>> Ingo
> >>>
> >>> On Mon, Sep 6, 2021 at 3:04 PM Ingo Bürk  wrote:
> >>>
>  Hi Chesnay,
> 
>  Those are all great questions, and I want to tackle those as well. For
> >> the
>  moment I went per-module, but runtime-wise that isn't ideal the more
>  modules we'd activate this in. ArchUnit does cache classes between
> >> tests,
>  but if we run them individually per module, we'd still add up quite a
> >> bit
>  of execution time (a single module in my IDE is around 10s with the
> >> tests I
>  currently have implemented, but I suspect the bottleneck here is the
>  importing of classes, not the number of tests). Ideally we'd just run
> >> them
>  once in a module with a big enough classpath to cover everything. If
> we
>  have such a place, that would probably be our best shot. I'll also
> keep
>  investigating here, of course.
> 
>  For now I just pushed a solution to avoid the overlap when executing
> it
>  per-module by matching on the URI. It's not the prettiest solution,
> but
>  does work; but that's more to not fail the tests in unrelated modules
> >> and
>  doesn't help much with execution time.
> 
> 
>  Ingo
> 
>  On Mon, Sep 6, 2021 at 1:57 PM Chesnay Schepler 
>  wrote:
> 
> > Do you have an estimate for long these tests would run for?
> >
> > For project-wide tests, what are the options for setting that up?
> > If we let the tests run per-module then I guess they'd overlap
> > considerably (because other Flink modules are being put on the
> > classpath), which isn't ideal.
> >
> > On 06/09/2021 13:51, David Morávek wrote:
> >> Hi Ingo,
> >>
> >> +1 for this effort. This could automate lot of "written rules" that
> >> are
> >> easy to forget about / not to be aware of (such as that each test
> >> should
> >> extend the TestLogger as Till has already mentioned).
> >>
> >> I went trough your examples and ArchUnit looks really powerful and
> >> expressive while still being easy to read.
> >>
> >> Best,
> >> D.
> >>
> >> On Mon, Sep 6, 2021 at 1:00 PM Ingo Bürk 
> wrote:
> >>
> >>> Thanks for your input Chesnay!
> >>>
> >>> The limitations of ArchUnit probably mostly stem from the fact that
> >> it
> >>> operates on byte code and thus can't access anything not accessible
> > from
> >>> byte code, i.e. JavaDocs. But I think Checkstyle and ArchUnit are
> >>> complementing each other quite well here. The main reason against
> >>> Checkstyle for these tests is its limitation to single files only,
> >>> rendering many tests (including the one you mentioned) impossible.
> >> The
> >>> secondary reason is that ArchUnit has more declarative APIs and the
> > tests
> >>> become quite easy to write and maintain (some groundwork effort is
> > needed,
> >>> of course). Over time we could probably expand quite a bit more on
> > what is
> >>> tested with ArchUnit as it can test entire architectures (package
> 

[jira] [Created] (FLINK-24226) Improve error message when trying Presto S3 FS with recoverable writer

2021-09-09 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-24226:


 Summary: Improve error message when trying Presto S3 FS with 
recoverable writer
 Key: FLINK-24226
 URL: https://issues.apache.org/jira/browse/FLINK-24226
 Project: Flink
  Issue Type: Technical Debt
  Components: FileSystems
Reporter: Chesnay Schepler
 Fix For: 1.15.0


When you use the Presto S3 FileSystem with recoverable writers you get this 
exception:

{code}
java.lang.UnsupportedOperationException: This s3 file system implementation 
does not support recoverable writers.
at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:260)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:396)
{code}

While this is technically _correct_ it's not really helpful to a user. It does 
not propose a course of action (e.g., "Use the Hadoop S3 FileSystem instead"), 
which is particularly important because this limitation isn't documented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24225) Define and implement UPSERT INTO semantics

2021-09-09 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-24225:
---

 Summary: Define and implement UPSERT INTO semantics
 Key: FLINK-24225
 URL: https://issues.apache.org/jira/browse/FLINK-24225
 Project: Flink
  Issue Type: Bug
Reporter: Francesco Guardiani


In the https://issues.apache.org/jira/browse/FLINK-22942, UPSERT INTO was 
disabled. We should define the correct behaviour for UPSERT INTO and implement 
it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24224) Table to stream, only the row datatype Stream works on CEP, and other POJOs, maps and jsonobjects datatype streams do not work, but any datatype stream to CEP can work b

2021-09-09 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-24224:


 Summary: Table to stream, only the row datatype Stream works on 
CEP, and other POJOs, maps and jsonobjects datatype streams do not work, but 
any datatype stream to CEP can work by only stream api .
 Key: FLINK-24224
 URL: https://issues.apache.org/jira/browse/FLINK-24224
 Project: Flink
  Issue Type: Bug
  Components: Library / CEP
Affects Versions: 1.13.2, 1.12.0, 1.11.2
Reporter: YUJIANBO


 

1、problem:*Table to stream*, only the *ROW* datatype Stream works on *CEP*, and 
other POJOs, maps and jsonobjects datatype streams do not work, but any 
datatype stream to CEP can work by only stream api .

2、version: I have tried 3 versions,such as 1.11.2、1.12.0、1.13.2

3、code:

(1)table to Stream  to  CEP   (only row datatype is ok,  other datatype Stream 
to CEP has no data print and it has no error message)
{code:java}
tableEnv.executeSql(creat_kafka_source);
tableEnv.executeSql(calculateSql);

Table tb = tableEnv.from("calculateSql");
String[] fieldNames = tb.getSchema().getFieldNames();
DataType[] fieldDataTypes = tb.getSchema().getFieldDataTypes();

KeyedStream ds = tableEnv
.toAppendStream(tb, Row.class)
.map(new RichMapFunction() {
Map map = new HashMap<>();

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (null == map) {
map = new HashMap<>();
}
}

@Override
public JSONObject map(Row value) throws Exception {
//将数据key和value添加到map中
RowParseUtil.setFieldAndValue2Map(map, fieldDataTypes, 
fieldNames, value);
JSONObject jsonObject = 
JSONObject.parseObject(JSON.toJSONString(map));
map.clear();
return jsonObject;
}
})
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("wStart") * 1000;
}
}).keyBy(x -> x.getString("x_forwarded_for"));
//it has data to print
ds.print();

Pattern pattern = Pattern.begin("begin")
.where(new SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws Exception {
log.info("===>" + value);
return true;
}
}).timesOrMore(1).within(Time.seconds(10));

PatternStream patternStream = CEP.pattern(ds, pattern);
//it has no data to print
patternStream.process(new PatternProcessFunction() {
@Override
public void processMatch(Map> match, Context ctx, 
Collector out) throws Exception {
out.collect("==>>>" + match.toString());
}
}).print();


{code}
(2) *Olny Stream API  to CEP* ( Any datatype ,  it is OK)
{code:java}
Properties proPs = kafkaUtil.getReceiveKfkProPs(receive_brokers, groupid);
FlinkKafkaConsumer> consumer =
new FlinkKafkaConsumer<>(receive_topic, new KafkaRecordSchema(), proPs);
consumer.setStartFromEarliest();

SingleOutputStreamOperator input = env.addSource(consumer)
.map(x -> {
return JSON.parseObject(x.value());
})
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(10)) {
@Override
public long extractTimestamp(JSONObject element) {
return element.getLongValue("ts");
}
})
.keyBy(x -> x.getString("x_forwarded_for")+x.getString("request_uri"))
.timeWindow(Time.seconds(1)).apply(new WindowFunction() {
@Override
public void apply(String s, TimeWindow window, Iterable 
input, Collector out) throws Exception {
Iterator iterator = input.iterator();
ArrayList list = new ArrayList<>();
int n = 0;
while (iterator.hasNext()) {
n++;
JSONObject next = iterator.next();
list.add(next);
}
JSONObject jsonObject = list.get(0);
jsonObject.put("ct",n);
jsonObject.remove("ts");
out.collect(jsonObject);
}
});

input.print();

//it is ok
Pattern minInterval = Pattern
.begin("begin").where(new SimpleCondition() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return true;
}
}).timesOrMore(1).within(Time.seconds(10));

PatternStream pattern = CEP.pattern(input, minInterval);
pattern.process(new PatternProcessFunction() {
@Override
public void processMatch(Map> map, Context 
context, Collector out) 

[jira] [Created] (FLINK-24223) Client should throw exception to warn users when the configurations set by program options conflict with those set by -D

2021-09-09 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-24223:
-

 Summary: Client should throw exception to warn users when the 
configurations set by program options conflict with those set by -D
 Key: FLINK-24223
 URL: https://issues.apache.org/jira/browse/FLINK-24223
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.13.0, 1.12.0, 1.11.0
Reporter: Zhanghao Chen


h2. Problem

Currently, program options (e.g. -d, -p) has high precedence over configuration 
options set by -D or -yD at client side. This may cause confusion for users, 
especially for those program options without args. For example, if a user sets 
-Dexecution.attached=false without setting -d (they may not be aware of the 
existence of this option), they will find that the configuration value does not 
take effect.
h2. Proposal

Client should throw exception to warn users when the configurations set by 
program options conflict with those set by -D.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)