[jira] [Created] (FLINK-22170) Manual test for hive dialect

2021-04-08 Thread Rui Li (Jira)
Rui Li created FLINK-22170:
--

 Summary: Manual test for hive dialect
 Key: FLINK-22170
 URL: https://issues.apache.org/jira/browse/FLINK-22170
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Rui Li
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22169) Beautify the CliTableauResultView when print

2021-04-08 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-22169:
-

 Summary: Beautify the CliTableauResultView when print
 Key: FLINK-22169
 URL: https://issues.apache.org/jira/browse/FLINK-22169
 Project: Flink
  Issue Type: Bug
Reporter: Shengkai Fang


In batch mode, the print is not as same as before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22168) Partition insert with union all will fail

2021-04-08 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22168:


 Summary: Partition insert with union all will fail
 Key: FLINK-22168
 URL: https://issues.apache.org/jira/browse/FLINK-22168
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.13.0


INSERT INTO partitioned_sink (e,a,g,f,c,d) SELECT e,a,456,123,c,d FROM MyTable 
GROUP BY a,b,c,d,e UNION ALL SELECT e,a,789,456,c,d FROM MyTable GROUP BY 
a,b,c,d,e



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22166) Empty values with sort willl fail

2021-04-08 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22166:


 Summary:  Empty values with sort willl fail
 Key: FLINK-22166
 URL: https://issues.apache.org/jira/browse/FLINK-22166
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.13.0


SELECT * FROM (VALUES 1, 2, 3) AS T (a) WHERE a = 1 and a = 2 ORDER BY a



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22167) Partial insert not works when complex fields reorder

2021-04-08 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22167:


 Summary: Partial insert not works when complex fields reorder
 Key: FLINK-22167
 URL: https://issues.apache.org/jira/browse/FLINK-22167
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.13.0


INSERT INTO sink (b,e,a,g,f,c,d) SELECT b,e,a,456,123,c,d FROM MyTable GROUP BY 
a,b,c,d,e



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22165) How to set rabbitmq correlationId when using rabbitmq sink in dataStreamEnv

2021-04-08 Thread Spongebob (Jira)
Spongebob created FLINK-22165:
-

 Summary: How to set rabbitmq correlationId when using rabbitmq 
sink in dataStreamEnv
 Key: FLINK-22165
 URL: https://issues.apache.org/jira/browse/FLINK-22165
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Affects Versions: 1.12.2
 Environment: Flink 1.12.2

rabbitmq 3.8.4
Reporter: Spongebob


Flink rabbitmq module provides source and sink function for rabbitmq. We can 
use the correlationId to deduplicate the checkpoints record, So can we set a 
correlationId for each message to sink into rabbitmq ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22164) Add jobId and JobName variable to JobManager metrics in per-job deployment mode

2021-04-08 Thread Lu Niu (Jira)
Lu Niu created FLINK-22164:
--

 Summary: Add jobId and JobName variable to JobManager metrics in 
per-job deployment mode
 Key: FLINK-22164
 URL: https://issues.apache.org/jira/browse/FLINK-22164
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Lu Niu


We expose all flink metrics to external system for monitoring and alerting. 
However, JobManager metrics only have one variable , which is not 
enough to target to one job when job is deployed to YARN. If flink job runs in 
per-job mode, which ensure one job per cluster, we can add jobId and JobName to 
JobMananger metrics. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Flink does not cleanup some disk memory after submitting jar over rest

2021-04-08 Thread Great Info
I have deployed my own flink setup in AWS ECS. One Service for JobManager
and one Service for task Managers. I am running one ECS task for a job
manager and 3 ecs tasks for TASK managers.

I have a kind of batch job which I upload using flink rest every-day with
changing new arguments, when I submit each time disk memory gets increased
by ~ 600MB, I have given a checkpoint as S3 . Also I have set
*historyserver.archive.clean-expired-jobs* true .

Since I am running on ECS, I am not able to find why the memory is getting
increased on every jar upload and execution .

What are the flink config params I should look at to make sure the memory
is not shooting up?


[jira] [Created] (FLINK-22163) DataTypes implementation for flink-csv with ZonedDateTime support

2021-04-08 Thread Jira
François Lacombe created FLINK-22163:


 Summary: DataTypes implementation for flink-csv with ZonedDateTime 
support
 Key: FLINK-22163
 URL: https://issues.apache.org/jira/browse/FLINK-22163
 Project: Flink
  Issue Type: Wish
  Components: Table SQL / Ecosystem
Affects Versions: 1.12.1
Reporter: François Lacombe


Dear maintainers

flink-csv's `CsvRowSerializationSchema` currently rely on legacy 
TypeInformation to serialize java Types.
It doesn't support ZonedDateTime (only LocalDateTime with LOCAL_DATE_TIME)

An exception is thrown when CsvRowSerializationSchema is build with a 
RowTypeInformaton containing a `TypeInformation.of(ZonedDateTime.class)`. See 
`CsvRowSchemaConverter` line 199.

A quick fix would be to use `toString()` by default in the serializer instead 
of throwing an Exception. Apache CSV already do this way.

 

I'm using flink-csv and Apache Flink 1.12.1

Am I missing anything regarding a more recent flink-csv implementation?

If not, is it planned to move flink-csv to DataTypes types (with 
DataTypes.TIMESTAMP_WITH_TIME_ZONE support) ?

 

All the best



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22162) Make Max Operator name Length Configurable

2021-04-08 Thread Lu Niu (Jira)
Lu Niu created FLINK-22162:
--

 Summary: Make Max Operator name Length Configurable
 Key: FLINK-22162
 URL: https://issues.apache.org/jira/browse/FLINK-22162
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Lu Niu


MaxOperatorNameLength is hardcoded to be 80. User might want to tune the 
parameter so that after exposing metrics to external metrics system, user can 
better query the metrics data by name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22161) 'Run Mesos WordCount test' failed on Azure

2021-04-08 Thread Jark Wu (Jira)
Jark Wu created FLINK-22161:
---

 Summary: 'Run Mesos WordCount test' failed on Azure
 Key: FLINK-22161
 URL: https://issues.apache.org/jira/browse/FLINK-22161
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Mesos
Reporter: Jark Wu
 Fix For: 1.13.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16210=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=ff888d9b-cd34-53cc-d90f-3e446d355529

{code}
Apr 08 14:24:10 Step 1/2 : FROM ubuntu:xenial
Head https://registry-1.docker.io/v2/library/ubuntu/manifests/xenial: received 
unexpected HTTP status: 500 Internal Server Error
Apr 08 14:24:12 Command: build_image failed. Retrying...
Apr 08 14:24:12 Building Mesos Docker container
Apr 08 14:24:12 Sending build context to Docker daemon  6.144kB

Apr 08 14:24:12 Step 1/2 : FROM ubuntu:xenial
Head https://registry-1.docker.io/v2/library/ubuntu/manifests/xenial: received 
unexpected HTTP status: 500 Internal Server Error
Apr 08 14:24:13 Command: build_image failed. Retrying...
Apr 08 14:24:13 Command: build_image failed 5 times.

Apr 08 14:24:13 ERROR: Could not build mesos image. Aborting...
Error: No such container: mesos-master
The MVN_REPO variable is not set. Defaulting to a blank string.
Removing network docker-mesos-cluster-network
Network docker-mesos-cluster-network not found.
Apr 08 14:24:14 [FAIL] Test script contains errors.
Apr 08 14:24:14 Checking for errors...
Apr 08 14:24:14 No errors in log files.
Apr 08 14:24:14 Checking for exceptions...
Apr 08 14:24:14 No exceptions in log files.
Apr 08 14:24:14 Checking for non-empty .out files...
grep: 
/home/vsts/work/1/s/flink-dist/target/flink-1.13-SNAPSHOT-bin/flink-1.13-SNAPSHOT/log/*.out:
 No such file or directory
Apr 08 14:24:14 No non-empty .out files.
Apr 08 14:24:14 
Apr 08 14:24:14 [FAIL] 'Run Mesos WordCount test' failed after 0 minutes and 10 
seconds! Test exited with exit code 1
Apr 08 14:24:14 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22160) Test Window TVF based aggregation and TopN

2021-04-08 Thread Jark Wu (Jira)
Jark Wu created FLINK-22160:
---

 Summary: Test Window TVF based aggregation and TopN
 Key: FLINK-22160
 URL: https://issues.apache.org/jira/browse/FLINK-22160
 Project: Flink
  Issue Type: Test
  Components: Table SQL / API
Reporter: Jark Wu
 Fix For: 1.13.0


In FLINK-19604 
([FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function]),
 we introduced a new syntax to express Window Aggregate and Window TopN. For 
Window Aggregate, we have also introduced a new window kind: cumulate windows. 

The scope of this task is to make sure:

1. The old window aggergate syntax ({{GROUP BY TUMBLE(...)}}) can be rewrite 
using the new syntax, and get the same results. Note, session window is not 
supported yet in the new syntax.
2. Verify the new CUMULATE window works as expect
3. Verify the new Window TopN workss as expect
4. Failure and recovery and rescale case: results are still correct.
5. Window emitting: window should be fired once watermark advances window end 
(we can manually generate source data with monotonically and slowly increasing 
timestamp)
6. The feature is well-documented


Note: the documentation for this feature is still going on (FLINK-22159), for 
testing the feature, we can use the FLIP documentation as an instruction for 
now. 






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Dev process: testing one suite only (and are our docs suggesting Java 8 accurate?)

2021-04-08 Thread Adam Roberts
Hey everyone, I'm looking to get the full set of unit tests working using
AdoptOpenJDK 11 with the OpenJ9 VM and I'm basically seeing problems with
the runtime tests (always going OoM creating new threads) and I'd also like
to have a go at https://issues.apache.org/jira/browse/FLINK-21672.

That being said... how do I run just the one test, or a set of tests in the
one package?

What are you doing to achieve this?

For Apache Spark I remember using mvn -fn -DwildcardSuites=org.apache.spark
test (the suite name), but with Flink that doesn't give me what I want
(lots more tests run, it's like the option is ignored - this was several
years ago now though).

I've also tried using Maven's ! directive but to no avail, I've been
through and tried
https://maven.apache.org/surefire/maven-surefire-plugin/examples/single-test.html,
and I've also tried mvn -Dtest=org.apache.flink.runtime* -fn test

I'm wondering if anyone has an awesome example and could potentially add it
to
https://cwiki.apache.org/confluence/display/FLINK/Setting+up+a+Flink+development+environment
as well please.

While I'm here... I did notice as well that we mention Java 8 - I assume
this can be Java 8 *or* 11? Or should it just say 11?

Any thoughts/suggestions would be awesome, thanks!


[jira] [Created] (FLINK-22159) Add documentation for the new window TVF based operations

2021-04-08 Thread Jark Wu (Jira)
Jark Wu created FLINK-22159:
---

 Summary: Add documentation for the new window TVF based operations
 Key: FLINK-22159
 URL: https://issues.apache.org/jira/browse/FLINK-22159
 Project: Flink
  Issue Type: Sub-task
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.13.0


In this 1.13 version, we have supported window TVF based aggregation and TopN 
of FLIP-145. We should add documentation for them. We may also need to 
restructure the "Queries" page.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22158) Test native Kubernetes pod template

2021-04-08 Thread Yang Wang (Jira)
Yang Wang created FLINK-22158:
-

 Summary: Test native Kubernetes pod template
 Key: FLINK-22158
 URL: https://issues.apache.org/jira/browse/FLINK-22158
 Project: Flink
  Issue Type: Test
  Components: Deployment / Kubernetes
Affects Versions: 1.13.0
Reporter: Yang Wang
 Fix For: 1.13.0


Flink allows users to define the JobManager and TaskManager pods via template 
files. This allows to support advanced features(e.g. init-container, sidecar 
container, volume mount, etc.) that are not supported by Flink Kubernetes 
config options directly. Use {{kubernetes.pod-template-file}} to specify a 
local file that contains the pod definition. It will be used to initialize the 
JobManager and TaskManager.

 

The documentation about how to start a session/application cluster with pod 
template could be found here[1].

 

[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22157) Join & Select a part of composite primary key will cause ArrayIndexOutOfBoundsException

2021-04-08 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-22157:
---

 Summary: Join & Select a part of composite primary key will cause 
ArrayIndexOutOfBoundsException
 Key: FLINK-22157
 URL: https://issues.apache.org/jira/browse/FLINK-22157
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: Caizhi Weng
 Fix For: 1.13.0


Add the following test case to 
{{org.apache.flink.table.planner.plan.stream.sql.join.JoinTest}} to reproduce 
this bug.

{code:scala}
@Test
def myTest(): Unit = {
  util.tableEnv.executeSql(
"""
  |CREATE TABLE MyTable (
  |  pk1 INT,
  |  pk2 BIGINT,
  |  PRIMARY KEY (pk1, pk2) NOT ENFORCED
  |) WITH (
  |  'connector'='values'
  |)
  |""".stripMargin)
  util.verifyExecPlan("SELECT A.a1 FROM A LEFT JOIN MyTable ON A.a1 = 
MyTable.pk1")
}
{code}

The exception stack is
{code}
java.lang.RuntimeException: Error while applying rule StreamPhysicalJoinRule, 
args [rel#141:FlinkLogicalJoin.LOGICAL.any.None: 
0.[NONE].[NONE](left=RelSubset#139,right=RelSubset#140,condition==($0, 
$1),joinType=left), rel#138:FlinkLogicalCalc.LOGICAL.any.None: 
0.[NONE].[NONE](input=RelSubset#137,select=a1), 
rel#121:FlinkLogicalTableSourceScan.LOGICAL.any.None: 
0.[NONE].[NONE](table=[default_catalog, default_database, MyTable, 
project=[pk1]],fields=pk1)]

at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:281)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:583)
at 
org.apache.flink.table.planner.plan.stream.sql.join.JoinTest.myTest(JoinTest.scala:300)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at 

[jira] [Created] (FLINK-22156) HiveDialectQueryITCase fails on Azure because of no output for 900 seconds

2021-04-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22156:
---

 Summary: HiveDialectQueryITCase fails on Azure because of no 
output for 900 seconds
 Key: FLINK-22156
 URL: https://issues.apache.org/jira/browse/FLINK-22156
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime, Tests
Reporter: Yingjie Cao
 Fix For: 1.13.0


[https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/16105/logs/139]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22155) Fix EXPLAIN implementation

2021-04-08 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-22155:
-

 Summary: Fix EXPLAIN implementation
 Key: FLINK-22155
 URL: https://issues.apache.org/jira/browse/FLINK-22155
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.0
Reporter: Shengkai Fang


When explain insert statement, the validator validate the whole statement 
rather than validate the query. But when execute insert statement, the planner 
only validate the query part of the insert statement. It may brings the result 
of the explan is different from the actual plan. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22154) PushFilterIntoTableSourceScanRule fails to deal with IN expressions

2021-04-08 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-22154:
---

 Summary: PushFilterIntoTableSourceScanRule fails to deal with IN 
expressions
 Key: FLINK-22154
 URL: https://issues.apache.org/jira/browse/FLINK-22154
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.0
Reporter: Caizhi Weng
 Fix For: 1.13.0


Add the following test case to {{PushFilterIntoLegacyTableSourceScanRuleTest}} 
to reproduce this bug. {{PushFilterIntoTableSourceScanRuleTest}} extends this 
class and will also be tested.

{code:scala}
@Test
def myTest(): Unit = {
  util.verifyRelPlan("SELECT * FROM MyTable WHERE name IN ('Alice', 'Bob', 
'Dave')")
}
{code}

The exception stack is
{code}
java.lang.AssertionError: OR(OR(=($0, _UTF-16LE'Alice':VARCHAR(5) CHARACTER SET 
"UTF-16LE"), =($0, _UTF-16LE'Bob':VARCHAR(5) CHARACTER SET "UTF-16LE")), =($0, 
_UTF-16LE'Dave':VARCHAR(5) CHARACTER SET "UTF-16LE"))

at org.apache.calcite.rel.core.Filter.(Filter.java:76)
at 
org.apache.calcite.rel.logical.LogicalFilter.(LogicalFilter.java:68)
at 
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
at 
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
at 
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:271)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:281)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
at 

[jira] [Created] (FLINK-22153) Manually test the sort-merge blocking shuffle

2021-04-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22153:
---

 Summary: Manually test the sort-merge blocking shuffle
 Key: FLINK-22153
 URL: https://issues.apache.org/jira/browse/FLINK-22153
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


In 1.12, we introduced sort-merge blocking shuffle to Flink and in 1.13, the 
feature was optimized which improves the usability (fix direct memory OOM 
issue) and performance (introduce IO scheduling and broadcast optimization).

The sort-merge blocking shuffle can be tested following the bellow process:
 # Write a simple batch job using either sql/table or DataStream API; (Word 
count should be enough)
 # Enable sort-merge blocking shuffle by setting 
taskmanager.network.sort-shuffle.min-parallelism to 1 in the Flink 
configuration file;
 # Submit and run the batch job with different parallelism and data volume;
 # Tune the relevant config options 
(taskmanager.network.blocking-shuffle.compression.enabled, 
taskmanager.network.sort-shuffle.min-buffers, 
taskmanager.memory.framework.off-heap.batch-shuffle.size) and see the 
influence. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22152) Fix the bug of same timers are registered multiple times

2021-04-08 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-22152:


 Summary: Fix the bug of same timers are registered multiple times
 Key: FLINK-22152
 URL: https://issues.apache.org/jira/browse/FLINK-22152
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Huang Xingbo
Assignee: Huang Xingbo
 Fix For: 1.13.0


The same timer will be registered multiple times. We need to deduplicate same 
timers




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Backport FLIP-27 Kafka source connector fixes with API change to release-1.12.

2021-04-08 Thread Becket Qin
Hi Arvid,

There are interface changes to the Kafka source, and there is a backwards
compatible change in the base source implementation. Therefore technically
speaking, users might be able to run the Kafka source in 1.13 with a 1.12
Flink job. However, it could be tricky because there might be some
dependent jar conflicts between 1.12 and 1.13. So this solution seems a
little fragile.

I'd second Till's question if there is an issue for users that start with
> the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka source
> with API changes.


Just to clarify, the bug fixes themselves include API changes, they are not
separable. So we basically have three options here:

1. Do not backport fixes in 1.12. So users have to upgrade to 1.13 in order
to use the new Kafka source.
2. Write some completely different fixes for release 1.12 and ask users to
migrate to the new API when they upgrade to 1.13
3. Backport the fix with API changes to 1.12. So users don't need to handle
interface change when they upgrade to 1.13+.

Personally I think option 3 here is better because it does not really
introduce any trouble to the users. The downside is that we do need to
change the API of Kafka source in 1.12. Given that the changed API won't be
useful without these bug fixes, changing the API seems to be doing more
good than bad here.

Thanks,

Jiangjie (Becket) Qin



On Thu, Apr 8, 2021 at 2:39 PM Arvid Heise  wrote:

> Hi Becket,
>
> did you need to change anything to the source interface itself? Wouldn't it
> be possible for users to simply use the 1.13 connector with their Flink
> 1.12 deployment?
>
> I think the late-upgrade argument can be made for any feature, but I also
> see that the Kafka connector is of high interest.
>
> I'd second Till's question if there is an issue for users that start with
> the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka source
> with API changes.
>
> Best,
>
> Arvid
>
> On Thu, Apr 8, 2021 at 2:54 AM Becket Qin  wrote:
>
> > Thanks for the comment, Till and Thomas.
> >
> > As far as I know there are some users who have just upgraded their Flink
> > version from 1.8 / 1.9 to Flink 1.12 and might not upgrade the version
> in 6
> > months or more. There are also some organizations that have the strategy
> of
> > not running the latest version of a project, but only the second latest
> > version with bug fixes. So those users may still benefit from the
> backport.
> > However, arguably the old Kafka source is there anyways in 1.12, so they
> > should not be blocked on having the new source.
> >
> > I am leaning towards backporting the fixes mainly because this way we
> might
> > have more users migrating to the new Source and provide feedback. It will
> > take some time for the users to pick up 1.13, especially for the users
> > running Flink at large scale. So backporting the fixes to 1.12 would help
> > get the new source to be used sooner.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Apr 8, 2021 at 12:40 AM Thomas Weise  wrote:
> >
> > > Hi,
> > >
> > > Thanks for fixing the new KafkaSource issues.
> > >
> > > I'm interested in using these fixes with 1.12 for experimental
> purposes.
> > >
> > > +1 for backporting. 1.12 is the current stable release and users who
> > would
> > > like to try the FLIP-27 sources are likely to use that release.
> > >
> > > Thomas
> > >
> > > On Wed, Apr 7, 2021 at 2:50 AM Till Rohrmann 
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > If I remember correctly, then we deliberately not documented the
> Kafka
> > > > connector in the 1.12 release. Hence, from this point there should be
> > no
> > > > need to backport any fixes because users are not aware of this
> feature.
> > > >
> > > > On the other hand this also means that we should be able to break
> > > anything
> > > > we want to. Consequently, backporting these fixes should be possible.
> > > >
> > > > The question would probably be whether we want to ship new features
> > with
> > > a
> > > > bug fix release. Do we know of any users who want to use the new
> Kafka
> > > > source, are using the 1.12 version and cannot upgrade to 1.13 once it
> > is
> > > > released? If this is the case, then this could be an argument for
> > > shipping
> > > > this feature with a bug fix release. If not, then we could save some
> > work
> > > > by not backporting it.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Wed, Apr 7, 2021 at 10:43 AM Becket Qin 
> > wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I'd like to start a discussion thread about backporting some
> FLIP-27
> > > > Kafka
> > > > > source connector fixes to release-1.12. These fixes include some
> API
> > > > > changes and thus needs a public discussion.
> > > > >
> > > > > The tickets in question are following:
> > > > > https://issues.apache.org/jira/browse/FLINK-20379
> > > > > https://issues.apache.org/jira/browse/FLINK-20114
> > > > > 

[jira] [Created] (FLINK-22151) Implement type inference for agg functions

2021-04-08 Thread Timo Walther (Jira)
Timo Walther created FLINK-22151:


 Summary: Implement type inference for agg functions
 Key: FLINK-22151
 URL: https://issues.apache.org/jira/browse/FLINK-22151
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Update avg, count, min, max, sum, sum0, stddevPop, stddevSamp, varPop, varSamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Zigzag shape in TM JVM used memory

2021-04-08 Thread Piotr Nowojski
Hi,

I don't think there is a Flink specific answer to this question. Just do
what you would normally do with a normal Java application running inside a
JVM. If there is an OOM on heap space, you can either try to bump the heap
space, or reduce usage of it. The only Flink specific part is probably that
you need to leave enough memory for the framework itself, and that there
are a couple of different memory pools. You can read about those things in
the docs:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_tuning.html
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_trouble.html

Piotrek



czw., 8 kwi 2021 o 02:19 Lu Niu  napisał(a):

> Hi, Piotr
>
> Thanks for replying. I asked this because such a pattern might imply memory
> oversubscription. For example, I tuned down the memory of one app from heap
> 2.63GB to 367MB and the job still runs fine:
> before:
>
> https://drive.google.com/file/d/1o8k9Vv3yb5gXITi4GvmlXMteQcRfmOhr/view?usp=sharing
>
> after:
>
> https://drive.google.com/file/d/1wNTHBT8aSJaAmL1rVY8jUkdp-G5znnMN/view?usp=sharing
>
>
> What's the best practice for tuning Flink job memory?
>
> 1. What’s a good start point users should try first?
> 2. How to make progress? e.g. flink application Foo currently encountered
> error OOM: java heap space. Where to move next? simply bump up
> taskmananger.memory? or just increase heap?
> 3. What’s the final state? Job running fine and ensuring XYZ headroom in
> each memory component?
>
> Best
> Lu
>
> On Tue, Apr 6, 2021 at 12:26 AM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > this should be posted on the user mailing list not the dev.
> >
> > Apart from that, this looks like normal/standard behaviour of JVM, and
> has
> > very little to do with Flink. Garbage Collector (GC) is kicking in when
> > memory usage is approaching some threshold:
> > https://www.google.com/search?q=jvm+heap+memory+usage=isch
> >
> > Piotrek
> >
> >
> > pon., 5 kwi 2021 o 22:54 Lu Niu  napisał(a):
> >
> > > Hi,
> > >
> > > we need to update our email system then :) . Here are the links:
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1lZ5_P8_NqsN1JeLzutGj4DxkyWJN75mR/view?usp=sharing
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1J6c6rQJwtDp1moAGlvQyLQXTqcuG4HjL/view?usp=sharing
> > >
> > >
> > >
> >
> https://drive.google.com/file/d/1-R2KzsABC471AEjkF5qTm5O3V47cpbBV/view?usp=sharing
> > >
> > > All are DataStream job.
> > >
> > > Best
> > > Lu
> > >
> > > On Sun, Apr 4, 2021 at 9:17 PM Yun Gao  wrote:
> > >
> > > >
> > > > Hi Lu,
> > > >
> > > > The image seems not be able to shown due to the mail server
> limitation,
> > > > could you upload it somewhere and paste the link here ?
> > > >
> > > > Logically, I think zigzag usually due to there are some small object
> > get
> > > > created and eliminated soon in the heap. Are you running a SQL job
> or a
> > > > DataStream job ?
> > > >
> > > > Best,
> > > > Yun
> > > >
> > > > --
> > > > Sender:Lu Niu
> > > > Date:2021/04/05 12:06:24
> > > > Recipient:dev@flink.apache.org
> > > > Theme:Zigzag shape in TM JVM used memory
> > > >
> > > > Hi, Flink dev
> > > >
> > > > We observed that the TM JVM used memory metric shows zigzag shape
> among
> > > > lots of our applications, although these applications are quite
> > different
> > > > in business logic. The upper bound is close to the max heap size. Is
> > this
> > > > expected in flink application? Or does flink internally
> > > > aggressively pre-allocate memory?
> > > >
> > > > app1
> > > > [image: Screen Shot 2021-04-04 at 8.46.45 PM.png]
> > > > app2
> > > > [image: Screen Shot 2021-04-04 at 8.45.35 PM.png]
> > > > app3
> > > > [image: Screen Shot 2021-04-04 at 8.43.53 PM.png]
> > > >
> > > > Best
> > > > Lu
> > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-22150) Test user-defined window in Python DataStream API

2021-04-08 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-22150:


 Summary: Test user-defined window in Python DataStream API
 Key: FLINK-22150
 URL: https://issues.apache.org/jira/browse/FLINK-22150
 Project: Flink
  Issue Type: Test
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Huang Xingbo
Assignee: Huang Xingbo
 Fix For: 1.13.0


It includes but not limited to the following testing items:
* user-defined window works well.
* performance test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22149) Test Python UDAF in Group Window

2021-04-08 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-22149:


 Summary: Test Python UDAF in Group Window
 Key: FLINK-22149
 URL: https://issues.apache.org/jira/browse/FLINK-22149
 Project: Flink
  Issue Type: Test
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Huang Xingbo
Assignee: Huang Xingbo
 Fix For: 1.13.0


It includes but not limited to the following testing items:
* Python Group Window Agg in Tumbling/Sliding/Session window works well
* Performance test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Apache Flink Stateful Functions 3.0.0, release candidate #1

2021-04-08 Thread Tzu-Li (Gordon) Tai
Thanks for voting and testing everyone!

We have a total of 6 +1 votes, 3 of which are binding:
- Igal Shilman
- Gordon Tai (binding)
- Konstantin Knauf
- Seth Wiesman
- Yu Li (binding)
- Robert Metzger (binding)

I'll proceed now with finalizing the release of StateFun 3.0.0.
The official announcement will likely happen next week, as we're finishing
up with the announcement blog post which would probably also take a few
days to be reviewed.

Thanks,
Gordon

On Thu, Apr 8, 2021 at 1:50 PM Robert Metzger  wrote:

> I see. Thanks a lot for clarifying.
>
> I then vote
>
> +1 (binding)
>
> on this release. Thanks a lot for driving this!
>
>
> On Thu, Apr 8, 2021 at 5:19 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> @Robert Metzger 
>>
>> Sorry, this is the correct link to the class file you are referring to
>> (previous link I mentioned is incorrect):
>>
>> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-common/src/main/java/com/google/protobuf/MoreByteStrings.java
>>
>> On Thu, Apr 8, 2021 at 11:17 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> @Robert Metzger 
>>>
>>> I assume the com/google/protobuf classfile you found is this one:
>>>
>>> https://github.com/apache/flink-statefun/blob/master/statefun-shaded/statefun-protobuf-shaded/src/main/java/org/apache/flink/statefun/sdk/shaded/com/google/protobuf/MoreByteStrings.java
>>>
>>> This actually isn't a class pulled from a Protobuf dependency - it's
>>> code developed under StateFun.
>>> The package com/google/protobuf was required because the class exists
>>> essentially as a workaround to access some package-private protected
>>> methods on Protobuf.
>>>
>>> I believe that in this case, a NOTICE acknowledgement is not required as
>>> we actually own that piece of code.
>>>
>>> Let me know what you think and if this clears things up!
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Thu, Apr 8, 2021 at 4:00 AM Robert Metzger 
>>> wrote:
>>>
 This jar contains a com/google/protobuf classfile, which is not
 declared in
 any NOTICE file (and doesn't ship the license file of protobuf):

 https://repository.apache.org/content/repositories/orgapacheflink-1415/org/apache/flink/statefun-flink-common/3.0.0/statefun-flink-common-3.0.0.jar

 I fear that this could be a blocker for the release?

 Otherwise, I did the following check:

 - src distribution looks fine: No binaries, js related files are
 declared
 (the copyright in the NOTICE file could be updated to 2021, but that's
 not
 a blocker)


 On Fri, Apr 2, 2021 at 8:29 AM Yu Li  wrote:

 > +1 (binding)
 >
 > Checked sums and signatures: OK
 > Checked RAT and end-to-end tests: OK
 > Checked version in pom/README/setup.py files: OK
 > Checked release notes: OK
 > Checked docker PR: OK
 >
 > Thanks for driving this release, Gordon!
 >
 > Best Regards,
 > Yu
 >
 >
 > On Fri, 2 Apr 2021 at 09:22, Seth Wiesman 
 wrote:
 >
 > > +1 (non-binding)
 > >
 > > - Built from source and executed end to end tests
 > > - Checked licenses and signatures
 > > - Deployed remote Java SDK to gke cluster
 > > - Took savepoint and statefully rescaled
 > >
 > > Seth
 > >
 > > On Thu, Apr 1, 2021 at 9:05 AM Konstantin Knauf 
 > wrote:
 > >
 > > > +1 (non-binding)
 > > >
 > > > - mvn clean install -Prun-e2e-tests (java 8) from source
 > > > - python3 -m unittest tests
 > > > - spin up Statefun Cluster on EKS with an image built from the
 > > Dockerfiles
 > > > of [1]
 > > > - run Python & Java Greeter example on AWS Lambda
 > > > - read through documentation (opened [2] to fix some tpoys)
 > > >
 > > > [1] https://github.com/apache/flink-statefun-docker/pull/13
 > > > [2] https://github.com/apache/flink-statefun/pull/219
 > > >
 > > > On Thu, Apr 1, 2021 at 6:46 AM Tzu-Li (Gordon) Tai <
 > tzuli...@apache.org>
 > > > wrote:
 > > >
 > > > > +1 (binding)
 > > > >
 > > > > - verified signatures and hashes
 > > > > - NOTICE and LICENSE files in statefun-flink-distribution,
 > > > > statefun-protobuf-shaded, and statefun-sdk-java looks sane
 > > > > - maven clean install -Prun-e2e-tests (java 8) from source
 > > > > - ran all examples and tutorials in
 apache/flink-statefun-playground
 > > with
 > > > > the new artifacts
 > > > > - Ran my SDK verifier utility [1] against the new Java and
 Python
 > SDKs.
 > > > >
 > > > > Cheers,
 > > > > Gordon
 > > > >
 > > > > [1] https://github.com/tzulitai/statefun-sdk-verifier
 > > > >
 > > > > On Wed, Mar 31, 2021 at 8:50 PM Igal Shilman <
 i...@ververica.com>
 > > wrote:
 > > > >
 > > > > > Thanks Gordon for managing the release!
 > > > > >
 > > > > > +1 (non binding) from my side:
 > > > > >
 > > > > > Here are the results of 

[jira] [Created] (FLINK-22148) Planner rules should use RexCall#equsls to check whether two rexCalls are equivalent

2021-04-08 Thread Shuo Cheng (Jira)
Shuo Cheng created FLINK-22148:
--

 Summary: Planner rules should use RexCall#equsls to check whether 
two rexCalls are equivalent
 Key: FLINK-22148
 URL: https://issues.apache.org/jira/browse/FLINK-22148
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.2
Reporter: Shuo Cheng
 Fix For: 1.13.0


Reproduce the bug by add the following test to `SemiAntiJoinTest`

 
{code:java}
// code placeholder
@Test
def testNotSimplifyJoinConditionWithSameDigest(): Unit = {
  val sqlQuery =
  """
|SELECT a
|FROM l
|WHERE c NOT IN (
|SELECT f FROM r WHERE f = c)
|""".stripMargin
  util.verifyRelPlan(sqlQuery)
}
{code}
 

CannotPlanException will be thrown, this is because Calcite planner will 
normalize a RexCall in the `equals` method (from 1.24), while in Flink planer 
rules, we still use toString to check two RexCalls are equivalent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Backport FLIP-27 Kafka source connector fixes with API change to release-1.12.

2021-04-08 Thread Arvid Heise
Hi Becket,

did you need to change anything to the source interface itself? Wouldn't it
be possible for users to simply use the 1.13 connector with their Flink
1.12 deployment?

I think the late-upgrade argument can be made for any feature, but I also
see that the Kafka connector is of high interest.

I'd second Till's question if there is an issue for users that start with
the current Kafka source (+bugfixes) to later upgrade to 1.13 Kafka source
with API changes.

Best,

Arvid

On Thu, Apr 8, 2021 at 2:54 AM Becket Qin  wrote:

> Thanks for the comment, Till and Thomas.
>
> As far as I know there are some users who have just upgraded their Flink
> version from 1.8 / 1.9 to Flink 1.12 and might not upgrade the version in 6
> months or more. There are also some organizations that have the strategy of
> not running the latest version of a project, but only the second latest
> version with bug fixes. So those users may still benefit from the backport.
> However, arguably the old Kafka source is there anyways in 1.12, so they
> should not be blocked on having the new source.
>
> I am leaning towards backporting the fixes mainly because this way we might
> have more users migrating to the new Source and provide feedback. It will
> take some time for the users to pick up 1.13, especially for the users
> running Flink at large scale. So backporting the fixes to 1.12 would help
> get the new source to be used sooner.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 8, 2021 at 12:40 AM Thomas Weise  wrote:
>
> > Hi,
> >
> > Thanks for fixing the new KafkaSource issues.
> >
> > I'm interested in using these fixes with 1.12 for experimental purposes.
> >
> > +1 for backporting. 1.12 is the current stable release and users who
> would
> > like to try the FLIP-27 sources are likely to use that release.
> >
> > Thomas
> >
> > On Wed, Apr 7, 2021 at 2:50 AM Till Rohrmann 
> wrote:
> >
> > > Hi Becket,
> > >
> > > If I remember correctly, then we deliberately not documented the Kafka
> > > connector in the 1.12 release. Hence, from this point there should be
> no
> > > need to backport any fixes because users are not aware of this feature.
> > >
> > > On the other hand this also means that we should be able to break
> > anything
> > > we want to. Consequently, backporting these fixes should be possible.
> > >
> > > The question would probably be whether we want to ship new features
> with
> > a
> > > bug fix release. Do we know of any users who want to use the new Kafka
> > > source, are using the 1.12 version and cannot upgrade to 1.13 once it
> is
> > > released? If this is the case, then this could be an argument for
> > shipping
> > > this feature with a bug fix release. If not, then we could save some
> work
> > > by not backporting it.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Apr 7, 2021 at 10:43 AM Becket Qin 
> wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I'd like to start a discussion thread about backporting some FLIP-27
> > > Kafka
> > > > source connector fixes to release-1.12. These fixes include some API
> > > > changes and thus needs a public discussion.
> > > >
> > > > The tickets in question are following:
> > > > https://issues.apache.org/jira/browse/FLINK-20379
> > > > https://issues.apache.org/jira/browse/FLINK-20114
> > > > https://issues.apache.org/jira/browse/FLINK-21817
> > > >
> > > > Without these fixes, the FLIP-27 Kafka source in release-1.12 is not
> > > really
> > > > usable, and the API changes only affect the Kafka Source. So it seems
> > > > breaking the API in this case is still worthwhile.
> > > >
> > > > It would be good to see what others think.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > >
> >
>