[jira] [Created] (FLINK-15937) Correct the Development Status for PyFlink

2020-02-05 Thread sunjincheng (Jira)
sunjincheng created FLINK-15937:
---

 Summary: Correct the Development Status for PyFlink
 Key: FLINK-15937
 URL: https://issues.apache.org/jira/browse/FLINK-15937
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.9.3, 1.10.0, 1.11.0
Reporter: sunjincheng
Assignee: sunjincheng


Correct the  `Development Status` value. 

>From `'Development Status :: 1 - Planning'` to `'Development Status :: 5 - 
>Production/Stable'`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15936) TaskExecutorTest#testSlotAcceptance deadlocks

2020-02-05 Thread Gary Yao (Jira)
Gary Yao created FLINK-15936:


 Summary: TaskExecutorTest#testSlotAcceptance deadlocks
 Key: FLINK-15936
 URL: https://issues.apache.org/jira/browse/FLINK-15936
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination, Tests
Affects Versions: 1.10.0
Reporter: Gary Yao
 Fix For: 1.10.1


https://api.travis-ci.org/v3/job/646510877/log.txt

{noformat}
"main" #1 prio=5 os_prio=0 tid=0x7f2f5800b800 nid=0x499 waiting on 
condition [0x7f2f61733000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x8669b3a8> (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorTest.testSlotAcceptance(TaskExecutorTest.java:875)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.10.0, release candidate #2

2020-02-05 Thread Jeff Zhang
-1, I just found one critical issue
https://issues.apache.org/jira/browse/FLINK-15935
This ticket means user unable to use watermark in sql if he specify both
flink planner and blink planner in pom.xml


   org.apache.flink
   flink-table-planner_${scala.binary.version}
   ${project.version}


   org.apache.flink
   flink-table-planner-blink_${scala.binary.version}
   ${project.version}




Thomas Weise  于2020年2月6日周四 上午5:16写道:

> I deployed commit 81cf2f9e59259389a6549b07dcf822ec63c899a4 and can confirm
> that the dataformat-cbor and checkpoint alignment metric issues are
> resolved.
>
>
> On Wed, Feb 5, 2020 at 11:26 AM Gary Yao  wrote:
>
> > Note that there is currently an ongoing discussion about whether
> > FLINK-15917
> > and FLINK-15918 should be fixed in 1.10.0 [1].
> >
> > [1]
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-dev/202002.mbox/%3CCA%2B5xAo3D21-T5QysQg3XOdm%3DL9ipz3rMkA%3DqMzxraJRgfuyg2A%40mail.gmail.com%3E
> >
> > On Wed, Feb 5, 2020 at 8:00 PM Gary Yao  wrote:
> >
> > > Hi everyone,
> > > Please review and vote on the release candidate #2 for the version
> > 1.10.0,
> > > as follows:
> > > [ ] +1, Approve the release
> > > [ ] -1, Do not approve the release (please provide specific comments)
> > >
> > >
> > > The complete staging area is available for your review, which includes:
> > > * JIRA release notes [1],
> > > * the official Apache source release and binary convenience releases to
> > be
> > > deployed to dist.apache.org [2], which are signed with the key with
> > > fingerprint BB137807CEFBE7DD2616556710B12A1F89C115E8 [3],
> > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > * source code tag "release-1.10.0-rc2" [5],
> > > * website pull request listing the new release and adding announcement
> > > blog post [6][7].
> > >
> > > The vote will be open for at least 24 hours. It is adopted by majority
> > > approval, with at least 3 PMC affirmative votes.
> > >
> > > Thanks,
> > > Yu & Gary
> > >
> > > [1]
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
> > > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.0-rc2/
> > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [4]
> > https://repository.apache.org/content/repositories/orgapacheflink-1332
> > > [5] https://github.com/apache/flink/releases/tag/release-1.10.0-rc2
> > > [6] https://github.com/apache/flink-web/pull/302
> > > [7] https://github.com/apache/flink-web/pull/301
> > >
> >
>


-- 
Best Regards

Jeff Zhang


[jira] [Created] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

2020-02-05 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15935:
--

 Summary: Unable to use watermark when depends both on flink 
planner and blink planner
 Key: FLINK-15935
 URL: https://issues.apache.org/jira/browse/FLINK-15935
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Jeff Zhang


Run the following code in module `flink-table-examples` (must be under this 
module)
{code:java}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.examples.java;


import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * javadoc.
 */
public class TableApiExample {

   /**
*
* @param args
* @throws Exception
*/
   public static void main(String[] args) throws Exception {

  StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
  bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
  StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
  bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
 "status  STRING,\n" +
 "direction STRING,\n" +
 "event_ts TIMESTAMP(3),\n" +
 "WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
 ") WITH (\n" +
 "  'connector.type' = 'kafka',   \n" +
 "  'connector.version' = 'universal',\n" +
 "  'connector.topic' = 'generated.events2',\n" +
 "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
 "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
 "  'connector.properties.group.id' = 'testGroup',\n" +
 "  'format.type'='json',\n" +
 "  'update-mode' = 'append'\n" +
 ")\n");

   }
}
 {code}
 

And hit the following error:
{code:java}

Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: 
From line 5, column 31 to line 5, column 38: Unknown identifier 
'event_ts'Exception in thread "main" 
org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to 
line 5, column 38: Unknown identifier 'event_ts' at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at 
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
 at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at 
org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
 at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
 at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
 at 

[jira] [Created] (FLINK-15934) RocksDB rocksdb_delete_helper return false

2020-02-05 Thread forideal (Jira)
forideal created FLINK-15934:


 Summary: RocksDB rocksdb_delete_helper return false
 Key: FLINK-15934
 URL: https://issues.apache.org/jira/browse/FLINK-15934
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.8.0
Reporter: forideal


Hi folks:

{code:java}
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7f481056b45e, pid=1, tid=0x7f474b3f3700
#
# JRE version: OpenJDK Runtime Environment (8.0_181-b13) (build 
1.8.0_181-8u181-b13-2~deb9u1-b13)
# Java VM: OpenJDK 64-Bit Server VM (25.181-b13 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# C[librocksdbjni-linux64.so+0x2fe45e]  rocksdb_delete_helper(JNIEnv_*, 
rocksdb::DB*, rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, 
_jbyteArray*, int, int)+0xce
#
# Core dump written. Default location: /opt/flink-1.8.0/core or core.1
#
# An error report file with more information is saved as:
# /tmp/hs_err_pid1.log
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15933) update content of how generic table schema is stored in hive via HiveCatalog

2020-02-05 Thread Bowen Li (Jira)
Bowen Li created FLINK-15933:


 Summary: update content of how generic table schema is stored in 
hive via HiveCatalog
 Key: FLINK-15933
 URL: https://issues.apache.org/jira/browse/FLINK-15933
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Bowen Li
Assignee: Rui Li


FLINK-15858  updated how generic table schema is stored in hive metastore, need 
to go thru the documentation to update related content, like

[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_catalog.html#step-4-start-sql-client-and-create-a-kafka-table-with-flink-sql-ddl]

 

cc [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Improve TableFactory to add Context

2020-02-05 Thread Bowen Li
+1, LGTM

On Tue, Feb 4, 2020 at 11:28 PM Jark Wu  wrote:

> +1 form my side.
> Thanks for driving this.
>
> Btw, could you also attach a JIRA issue with the changes described in it,
> so that users can find the issue through the mailing list in the future.
>
> Best,
> Jark
>
> On Wed, 5 Feb 2020 at 13:38, Kurt Young  wrote:
>
> > +1 from my side.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Feb 5, 2020 at 10:59 AM Jingsong Li 
> > wrote:
> >
> > > Hi all,
> > >
> > > Interface updated.
> > > Please re-vote.
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Tue, Feb 4, 2020 at 1:28 PM Jingsong Li 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to start the vote for the improve of
> > > > TableFactory, which is discussed and
> > > > reached a consensus in the discussion thread[2].
> > > >
> > > > The vote will be open for at least 72 hours. I'll try to close it
> > > > unless there is an objection or not enough votes.
> > > >
> > > > [1]
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-TableFactory-td36647.html
> > > >
> > > > Best,
> > > > Jingsong Lee
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>


[jira] [Created] (FLINK-15932) Add download url to hive dependencies

2020-02-05 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15932:


 Summary: Add download url to hive dependencies
 Key: FLINK-15932
 URL: https://issues.apache.org/jira/browse/FLINK-15932
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive, Documentation
Reporter: Jingsong Lee
 Fix For: 1.11.0


Now in 
[https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies]

We list all dependencies for all supported hive versions. But no download url, 
it is not so handy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15931) Add utility scripts / tooling for releasing Stateful Functions

2020-02-05 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-15931:
---

 Summary: Add utility scripts / tooling for releasing Stateful 
Functions
 Key: FLINK-15931
 URL: https://issues.apache.org/jira/browse/FLINK-15931
 Project: Flink
  Issue Type: Sub-task
  Components: Stateful Functions
Affects Versions: statefun-1.1
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


As a starting point, I'd like to adopt the tools and steps that 
{{apache/flink}} uses for releasing.

For Stateful Functions, we need scripts to:
* Create release branches
* Create and sign source distribution
* Stage Maven artifacts



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15930) Setup Stateful Function's Spotless plugin to check Javadoc violations to comply with Maven Javadoc plugin

2020-02-05 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-15930:
---

 Summary: Setup Stateful Function's Spotless plugin to check 
Javadoc violations to comply with Maven Javadoc plugin
 Key: FLINK-15930
 URL: https://issues.apache.org/jira/browse/FLINK-15930
 Project: Flink
  Issue Type: Task
  Components: Build System / Stateful Functions, Stateful Functions
Affects Versions: statefun-1.1
Reporter: Tzu-Li (Gordon) Tai


The spotless plugin used by Stateful Functions currently do not properly check 
Javadocs that violate the style recognized by the Maven Javadoc plugin.
This was apparent when attempting to build the project as in 
https://github.com/apache/flink-statefun/pull/12.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15929) test_dependency failed on travis

2020-02-05 Thread Dian Fu (Jira)
Dian Fu created FLINK-15929:
---

 Summary: test_dependency failed on travis
 Key: FLINK-15929
 URL: https://issues.apache.org/jira/browse/FLINK-15929
 Project: Flink
  Issue Type: Test
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.1, 1.11.0


The Python tests "test_dependency" is instable. It failed on travis with the 
following exception:
{code}
"Source: PythonInputFormatTableSource(a, b) -> 
SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_862767019,
 source: [PythonInputFormatTableSource(a, b)]], fields=[a, b]) -> 
StreamExecPythonCalc -> Calc(select=[f0 AS _c0, a]) -> SinkConversionToRow -> 
Map -> Sink: Unnamed (1/2)" #581 prio=5 os_prio=0 cpu=32.04ms elapsed=302.56s 
tid=0x01f26000 nid=0x4662 waiting on condition  [0x7f0acb7f5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.2/Native Method)
- parking to wait for  <0x8aa3bfc0> (a 
java.util.concurrent.CompletableFuture$Signaller)
at 
java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.2/LockSupport.java:234)
at 
java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.2/CompletableFuture.java:1798)
at 
java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.2/ForkJoinPool.java:3128)
at 
java.util.concurrent.CompletableFuture.timedGet(java.base@11.0.2/CompletableFuture.java:1868)
at 
java.util.concurrent.CompletableFuture.get(java.base@11.0.2/CompletableFuture.java:2021)
at 
org.apache.beam.runners.fnexecution.control.MapControlClientPool.getClient(MapControlClientPool.java:69)
at 
org.apache.beam.runners.fnexecution.control.MapControlClientPool$$Lambda$1090/0x000100d70040.take(Unknown
 Source)
at 
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:126)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
- locked <0x8aa02788> (a 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$StrongEntry)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
at 
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:179)
at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:193)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:139)
at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:143)
at 
org.apache.flink.table.runtime.operators.python.BaseRowPythonScalarFunctionOperator.open(BaseRowPythonScalarFunctionOperator.java:86)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$874/0x000100a4d840.run(Unknown
 Source)
at 

[jira] [Created] (FLINK-15928) Batch mode in blink planner caused IndexOutOfBoundsException error

2020-02-05 Thread Fanbin Bu (Jira)
Fanbin Bu created FLINK-15928:
-

 Summary: Batch mode in blink planner caused 
IndexOutOfBoundsException error
 Key: FLINK-15928
 URL: https://issues.apache.org/jira/browse/FLINK-15928
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.2
Reporter: Fanbin Bu


Flink version: 1.9.2

mode: Batch mode, running on EMR with YARN

The following is the details:

 

table source sample:

class SnowflakeTableSource(val schema: TableSchema,
 val parallelism: Int,
 val fetchSize: Int,
 val query: String,
 val options: SnowflakeOptions
 )
 extends StreamTableSource[Row] {

override def getDataStream(execEnv: StreamExecutionEnvironment): 
SingleOutputStreamOperator[Row] = {
 execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
}

override def getReturnType: TypeInformation[Row] = schema.toRowType

override def getTableSchema: TableSchema = schema

override def isBounded: Boolean = true

private def getInputFormat: JDBCInputFormat = {
 JDBCInputFormat.buildJDBCInputFormat
 .setDrivername(options.driverName)
 .setDBUrl(options.dbUrl)
 .setUsername(options.username)
 .setPassword(options.password)
 .setQuery(query)
 .setRowTypeInfo(getInputRowTypeInfo)
 .setFetchSize(fetchSize)
 .setParametersProvider(new 
GenericParameterValuesProvider(buildQueryParams(parallelism)))
 .finish
}

}

 

Here is the sample setup code:

val settings = EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .inBatchMode()
 .build()

val tableEnv = TableEnvironment.create(settings)
val configurations = tableEnv.getConfig.getConfiguration

configurations.setString(
 TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key, 
s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")

tableEnv.registerTableSource(tableName, tableSource)

queryResult = tableEnv.sqlQuery(sql)

tableEnv.execute()

 

Here is the sample SQL:

select 
ip_address
 , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
 , sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
, sum(case when name = 'signin_failure' then 1 else 0 end) as 
signin_failure_count_1m

...
from events
group by
ip_address
 , hop(created_at, interval '30' second, interval '1' minute)

 

Here is the stacktrace:

java.lang.IndexOutOfBoundsException at 
org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at 
org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at 
HashWinAggWithKeys$538.endInput(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)

 

The fact that this same code works well with other sql and the stacktrace 
message suggests that this might be related to memory issue. And this only 
happens for blink planner in batch mode. I tried to use BatchTableEnvironment 
in old planner and it works.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.10.0, release candidate #2

2020-02-05 Thread Thomas Weise
I deployed commit 81cf2f9e59259389a6549b07dcf822ec63c899a4 and can confirm
that the dataformat-cbor and checkpoint alignment metric issues are
resolved.


On Wed, Feb 5, 2020 at 11:26 AM Gary Yao  wrote:

> Note that there is currently an ongoing discussion about whether
> FLINK-15917
> and FLINK-15918 should be fixed in 1.10.0 [1].
>
> [1]
>
> http://mail-archives.apache.org/mod_mbox/flink-dev/202002.mbox/%3CCA%2B5xAo3D21-T5QysQg3XOdm%3DL9ipz3rMkA%3DqMzxraJRgfuyg2A%40mail.gmail.com%3E
>
> On Wed, Feb 5, 2020 at 8:00 PM Gary Yao  wrote:
>
> > Hi everyone,
> > Please review and vote on the release candidate #2 for the version
> 1.10.0,
> > as follows:
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> > The complete staging area is available for your review, which includes:
> > * JIRA release notes [1],
> > * the official Apache source release and binary convenience releases to
> be
> > deployed to dist.apache.org [2], which are signed with the key with
> > fingerprint BB137807CEFBE7DD2616556710B12A1F89C115E8 [3],
> > * all artifacts to be deployed to the Maven Central Repository [4],
> > * source code tag "release-1.10.0-rc2" [5],
> > * website pull request listing the new release and adding announcement
> > blog post [6][7].
> >
> > The vote will be open for at least 24 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Yu & Gary
> >
> > [1]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.0-rc2/
> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1332
> > [5] https://github.com/apache/flink/releases/tag/release-1.10.0-rc2
> > [6] https://github.com/apache/flink-web/pull/302
> > [7] https://github.com/apache/flink-web/pull/301
> >
>


Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Thomas Weise
I think that's a good idea.

(Opt-in for existing users, until the backward compatibility issues are
resolved.)


On Wed, Feb 5, 2020 at 11:57 AM Arvid Heise  wrote:

> Couldn't we treat a missing option as legacy, but set the new scheduler as
> the default value in all newly shipped flink-conf.yaml?
>
> In this way, old users get the old behavior (either implicitly or
> explicitly) unless they explicitly upgrade.
> New users benefit from the new scheduler.
>
> On Wed, Feb 5, 2020 at 8:13 PM Gary Yao  wrote:
>
> > It is indeed unfortunate that these issues are discovered only now. I
> think
> > Thomas has a valid point, and we might be risking the trust of our users
> > here.
> >
> > What are our options?
> >
> > 1. Document this behavior and how to work around it copiously in the
> > release notes [1]
> > 2. Try to restore the previous behavior
> > 3. Change default value of jobmanager.scheduler to "legacy" and
> rollout
> > the feature in 1.11
> > 4. Change default value of jobmanager.scheduler to "legacy" and
> rollout
> > the feature earliest in 1.10.1
> >
> > [1]
> >
> >
> https://github.com/apache/flink/pull/10997/files#diff-b84c5611825842e8f74301ca70d94d23R86
> >
> > On Wed, Feb 5, 2020 at 7:24 PM Stephan Ewen  wrote:
> >
> > > Should we make these a blocker? I am not sure - we could also clearly
> > > state in the release notes how to restore the old behavior, if your
> setup
> > > assumes that behavior.
> > >
> > > Release candidates for this release have been out since mid December,
> it
> > > is a bit unfortunate that these things have been raised so late.
> > > Having these rather open ended tickets (how to re-define the existing
> > > metrics in the new scheduler/failover handling) now as release blockers
> > > would mean that 1.10 is indefinitely delayed.
> > >
> > > Are we sure we want to do that?
> > >
> > > On Wed, Feb 5, 2020 at 6:53 PM Thomas Weise  wrote:
> > >
> > >> Hi Gary,
> > >>
> > >> Thanks for the clarification!
> > >>
> > >> When we upgrade to a new Flink release, we don't start with a default
> > >> flink-conf.yaml but upgrade our existing tooling and configuration.
> > >> Therefore we notice this issue as part of the upgrade to 1.10, and not
> > >> when
> > >> we upgraded to 1.9.
> > >>
> > >> I would expect many other users to be in the same camp, and therefore
> > >> consider making these regressions a blocker for 1.10?
> > >>
> > >> Thanks,
> > >> Thomas
> > >>
> > >>
> > >> On Wed, Feb 5, 2020 at 4:53 AM Gary Yao  wrote:
> > >>
> > >> > > also notice that the exception causing a restart is no longer
> > >> displayed
> > >> > > in the UI, which is probably related?
> > >> >
> > >> > Yes, this is also related to the new scheduler. I created
> FLINK-15917
> > >> [1]
> > >> > to
> > >> > track this. Moreover, I created a ticket about the uptime metric not
> > >> > resetting
> > >> > [2]. Both issues already exist in 1.9 if
> > >> > "jobmanager.execution.failover-strategy" is set to "region", which
> is
> > >> the
> > >> > case
> > >> > in the default flink-conf.yaml.
> > >> >
> > >> > In 1.9, unsetting "jobmanager.execution.failover-strategy" was
> enough
> > to
> > >> > fall
> > >> > back to the previous behavior.
> > >> >
> > >> > In 1.10, you can still fall back to the previous behavior by setting
> > >> > "jobmanager.scheduler: legacy" and unsetting
> > >> > "jobmanager.execution.failover-strategy" in your flink-conf.yaml
> > >> >
> > >> > I would not consider these issues blockers since there is a
> workaround
> > >> for
> > >> > them, but of course we would like to see the new scheduler getting
> > some
> > >> > production exposure. More detailed release notes about the caveats
> of
> > >> the
> > >> > new
> > >> > scheduler will be added to the user documentation.
> > >> >
> > >> >
> > >> > > The watermark issue was
> > >> > https://issues.apache.org/jira/browse/FLINK-14470
> > >> >
> > >> > This should be fixed now [3].
> > >> >
> > >> >
> > >> > [1] https://issues.apache.org/jira/browse/FLINK-15917
> > >> > [2] https://issues.apache.org/jira/browse/FLINK-15918
> > >> > [3] https://issues.apache.org/jira/browse/FLINK-8949
> > >> >
> > >> > On Wed, Feb 5, 2020 at 7:04 AM Thomas Weise  wrote:
> > >> >
> > >> >> Hi Gary,
> > >> >>
> > >> >> Thanks for the reply.
> > >> >>
> > >> >> -->
> > >> >>
> > >> >> On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:
> > >> >>
> > >> >> > Hi Thomas,
> > >> >> >
> > >> >> > > 2) Was there a change in how job recovery reflects in the
> uptime
> > >> >> metric?
> > >> >> > > Didn't uptime previously reset to 0 on recovery (now it just
> > keeps
> > >> >> > > increasing)?
> > >> >> >
> > >> >> > The uptime is the difference between the current time and the
> time
> > >> when
> > >> >> the
> > >> >> > job transitioned to RUNNING state. By default we no longer
> > transition
> > >> >> the
> > >> >> > job
> > >> >> > out of the RUNNING state when restarting. This has something to
> do
> > >> with
> > >> >> the
> > >> >> > new 

Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Arvid Heise
Couldn't we treat a missing option as legacy, but set the new scheduler as
the default value in all newly shipped flink-conf.yaml?

In this way, old users get the old behavior (either implicitly or
explicitly) unless they explicitly upgrade.
New users benefit from the new scheduler.

On Wed, Feb 5, 2020 at 8:13 PM Gary Yao  wrote:

> It is indeed unfortunate that these issues are discovered only now. I think
> Thomas has a valid point, and we might be risking the trust of our users
> here.
>
> What are our options?
>
> 1. Document this behavior and how to work around it copiously in the
> release notes [1]
> 2. Try to restore the previous behavior
> 3. Change default value of jobmanager.scheduler to "legacy" and rollout
> the feature in 1.11
> 4. Change default value of jobmanager.scheduler to "legacy" and rollout
> the feature earliest in 1.10.1
>
> [1]
>
> https://github.com/apache/flink/pull/10997/files#diff-b84c5611825842e8f74301ca70d94d23R86
>
> On Wed, Feb 5, 2020 at 7:24 PM Stephan Ewen  wrote:
>
> > Should we make these a blocker? I am not sure - we could also clearly
> > state in the release notes how to restore the old behavior, if your setup
> > assumes that behavior.
> >
> > Release candidates for this release have been out since mid December, it
> > is a bit unfortunate that these things have been raised so late.
> > Having these rather open ended tickets (how to re-define the existing
> > metrics in the new scheduler/failover handling) now as release blockers
> > would mean that 1.10 is indefinitely delayed.
> >
> > Are we sure we want to do that?
> >
> > On Wed, Feb 5, 2020 at 6:53 PM Thomas Weise  wrote:
> >
> >> Hi Gary,
> >>
> >> Thanks for the clarification!
> >>
> >> When we upgrade to a new Flink release, we don't start with a default
> >> flink-conf.yaml but upgrade our existing tooling and configuration.
> >> Therefore we notice this issue as part of the upgrade to 1.10, and not
> >> when
> >> we upgraded to 1.9.
> >>
> >> I would expect many other users to be in the same camp, and therefore
> >> consider making these regressions a blocker for 1.10?
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Wed, Feb 5, 2020 at 4:53 AM Gary Yao  wrote:
> >>
> >> > > also notice that the exception causing a restart is no longer
> >> displayed
> >> > > in the UI, which is probably related?
> >> >
> >> > Yes, this is also related to the new scheduler. I created FLINK-15917
> >> [1]
> >> > to
> >> > track this. Moreover, I created a ticket about the uptime metric not
> >> > resetting
> >> > [2]. Both issues already exist in 1.9 if
> >> > "jobmanager.execution.failover-strategy" is set to "region", which is
> >> the
> >> > case
> >> > in the default flink-conf.yaml.
> >> >
> >> > In 1.9, unsetting "jobmanager.execution.failover-strategy" was enough
> to
> >> > fall
> >> > back to the previous behavior.
> >> >
> >> > In 1.10, you can still fall back to the previous behavior by setting
> >> > "jobmanager.scheduler: legacy" and unsetting
> >> > "jobmanager.execution.failover-strategy" in your flink-conf.yaml
> >> >
> >> > I would not consider these issues blockers since there is a workaround
> >> for
> >> > them, but of course we would like to see the new scheduler getting
> some
> >> > production exposure. More detailed release notes about the caveats of
> >> the
> >> > new
> >> > scheduler will be added to the user documentation.
> >> >
> >> >
> >> > > The watermark issue was
> >> > https://issues.apache.org/jira/browse/FLINK-14470
> >> >
> >> > This should be fixed now [3].
> >> >
> >> >
> >> > [1] https://issues.apache.org/jira/browse/FLINK-15917
> >> > [2] https://issues.apache.org/jira/browse/FLINK-15918
> >> > [3] https://issues.apache.org/jira/browse/FLINK-8949
> >> >
> >> > On Wed, Feb 5, 2020 at 7:04 AM Thomas Weise  wrote:
> >> >
> >> >> Hi Gary,
> >> >>
> >> >> Thanks for the reply.
> >> >>
> >> >> -->
> >> >>
> >> >> On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:
> >> >>
> >> >> > Hi Thomas,
> >> >> >
> >> >> > > 2) Was there a change in how job recovery reflects in the uptime
> >> >> metric?
> >> >> > > Didn't uptime previously reset to 0 on recovery (now it just
> keeps
> >> >> > > increasing)?
> >> >> >
> >> >> > The uptime is the difference between the current time and the time
> >> when
> >> >> the
> >> >> > job transitioned to RUNNING state. By default we no longer
> transition
> >> >> the
> >> >> > job
> >> >> > out of the RUNNING state when restarting. This has something to do
> >> with
> >> >> the
> >> >> > new scheduler which enables pipelined region failover by default
> [1].
> >> >> > Actually
> >> >> > we enabled pipelined region failover already in the binary
> >> distribution
> >> >> of
> >> >> > Flink 1.9 by setting:
> >> >> >
> >> >> > jobmanager.execution.failover-strategy: region
> >> >> >
> >> >> > in the default flink-conf.yaml. Unless you have removed this config
> >> >> option
> >> >> > or
> >> >> > you are using a custom yaml, you should be 

[jira] [Created] (FLINK-15927) TaskExecutor should treat it as a fatal error is Task cannot be failed

2020-02-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-15927:


 Summary: TaskExecutor should treat it as a fatal error is Task 
cannot be failed
 Key: FLINK-15927
 URL: https://issues.apache.org/jira/browse/FLINK-15927
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Reporter: Stephan Ewen


When the TaskManager cannot mark a task as failed, it is only logged as a 
warning.
This should instead be treated as a fatal error.

TaskExecutor line 1419:
{code}
try {
task.failExternally(cause);
} catch (Throwable t) {
log.error("Could not fail task {}.", executionAttemptID, t);
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release 1.10.0, release candidate #2

2020-02-05 Thread Gary Yao
Note that there is currently an ongoing discussion about whether FLINK-15917
and FLINK-15918 should be fixed in 1.10.0 [1].

[1]
http://mail-archives.apache.org/mod_mbox/flink-dev/202002.mbox/%3CCA%2B5xAo3D21-T5QysQg3XOdm%3DL9ipz3rMkA%3DqMzxraJRgfuyg2A%40mail.gmail.com%3E

On Wed, Feb 5, 2020 at 8:00 PM Gary Yao  wrote:

> Hi everyone,
> Please review and vote on the release candidate #2 for the version 1.10.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint BB137807CEFBE7DD2616556710B12A1F89C115E8 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.10.0-rc2" [5],
> * website pull request listing the new release and adding announcement
> blog post [6][7].
>
> The vote will be open for at least 24 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Yu & Gary
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.0-rc2/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1332
> [5] https://github.com/apache/flink/releases/tag/release-1.10.0-rc2
> [6] https://github.com/apache/flink-web/pull/302
> [7] https://github.com/apache/flink-web/pull/301
>


Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Gary Yao
It is indeed unfortunate that these issues are discovered only now. I think
Thomas has a valid point, and we might be risking the trust of our users
here.

What are our options?

1. Document this behavior and how to work around it copiously in the
release notes [1]
2. Try to restore the previous behavior
3. Change default value of jobmanager.scheduler to "legacy" and rollout
the feature in 1.11
4. Change default value of jobmanager.scheduler to "legacy" and rollout
the feature earliest in 1.10.1

[1]
https://github.com/apache/flink/pull/10997/files#diff-b84c5611825842e8f74301ca70d94d23R86

On Wed, Feb 5, 2020 at 7:24 PM Stephan Ewen  wrote:

> Should we make these a blocker? I am not sure - we could also clearly
> state in the release notes how to restore the old behavior, if your setup
> assumes that behavior.
>
> Release candidates for this release have been out since mid December, it
> is a bit unfortunate that these things have been raised so late.
> Having these rather open ended tickets (how to re-define the existing
> metrics in the new scheduler/failover handling) now as release blockers
> would mean that 1.10 is indefinitely delayed.
>
> Are we sure we want to do that?
>
> On Wed, Feb 5, 2020 at 6:53 PM Thomas Weise  wrote:
>
>> Hi Gary,
>>
>> Thanks for the clarification!
>>
>> When we upgrade to a new Flink release, we don't start with a default
>> flink-conf.yaml but upgrade our existing tooling and configuration.
>> Therefore we notice this issue as part of the upgrade to 1.10, and not
>> when
>> we upgraded to 1.9.
>>
>> I would expect many other users to be in the same camp, and therefore
>> consider making these regressions a blocker for 1.10?
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Feb 5, 2020 at 4:53 AM Gary Yao  wrote:
>>
>> > > also notice that the exception causing a restart is no longer
>> displayed
>> > > in the UI, which is probably related?
>> >
>> > Yes, this is also related to the new scheduler. I created FLINK-15917
>> [1]
>> > to
>> > track this. Moreover, I created a ticket about the uptime metric not
>> > resetting
>> > [2]. Both issues already exist in 1.9 if
>> > "jobmanager.execution.failover-strategy" is set to "region", which is
>> the
>> > case
>> > in the default flink-conf.yaml.
>> >
>> > In 1.9, unsetting "jobmanager.execution.failover-strategy" was enough to
>> > fall
>> > back to the previous behavior.
>> >
>> > In 1.10, you can still fall back to the previous behavior by setting
>> > "jobmanager.scheduler: legacy" and unsetting
>> > "jobmanager.execution.failover-strategy" in your flink-conf.yaml
>> >
>> > I would not consider these issues blockers since there is a workaround
>> for
>> > them, but of course we would like to see the new scheduler getting some
>> > production exposure. More detailed release notes about the caveats of
>> the
>> > new
>> > scheduler will be added to the user documentation.
>> >
>> >
>> > > The watermark issue was
>> > https://issues.apache.org/jira/browse/FLINK-14470
>> >
>> > This should be fixed now [3].
>> >
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-15917
>> > [2] https://issues.apache.org/jira/browse/FLINK-15918
>> > [3] https://issues.apache.org/jira/browse/FLINK-8949
>> >
>> > On Wed, Feb 5, 2020 at 7:04 AM Thomas Weise  wrote:
>> >
>> >> Hi Gary,
>> >>
>> >> Thanks for the reply.
>> >>
>> >> -->
>> >>
>> >> On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:
>> >>
>> >> > Hi Thomas,
>> >> >
>> >> > > 2) Was there a change in how job recovery reflects in the uptime
>> >> metric?
>> >> > > Didn't uptime previously reset to 0 on recovery (now it just keeps
>> >> > > increasing)?
>> >> >
>> >> > The uptime is the difference between the current time and the time
>> when
>> >> the
>> >> > job transitioned to RUNNING state. By default we no longer transition
>> >> the
>> >> > job
>> >> > out of the RUNNING state when restarting. This has something to do
>> with
>> >> the
>> >> > new scheduler which enables pipelined region failover by default [1].
>> >> > Actually
>> >> > we enabled pipelined region failover already in the binary
>> distribution
>> >> of
>> >> > Flink 1.9 by setting:
>> >> >
>> >> > jobmanager.execution.failover-strategy: region
>> >> >
>> >> > in the default flink-conf.yaml. Unless you have removed this config
>> >> option
>> >> > or
>> >> > you are using a custom yaml, you should be seeing this behavior in
>> Flink
>> >> > 1.9.
>> >> > If you do not want region failover, set
>> >> >
>> >> > jobmanager.execution.failover-strategy: full
>> >> >
>> >> >
>> >> We are using the default (the jobmanager.execution.failover-strategy
>> >> setting is not present in our flink config).
>> >>
>> >> The change in behavior I see is between the 1.9 based deployment and
>> the
>> >> 1.10 RC.
>> >>
>> >> Our 1.9 branch is here:
>> >> https://github.com/lyft/flink/tree/release-1.9-lyft
>> >>
>> >> I also notice that the exception causing a restart is no longer
>> displayed
>> >> in the UI, which 

[VOTE] Release 1.10.0, release candidate #2

2020-02-05 Thread Gary Yao
Hi everyone,
Please review and vote on the release candidate #2 for the version 1.10.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint BB137807CEFBE7DD2616556710B12A1F89C115E8 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.10.0-rc2" [5],
* website pull request listing the new release and adding announcement blog
post [6][7].

The vote will be open for at least 24 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Yu & Gary

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1332
[5] https://github.com/apache/flink/releases/tag/release-1.10.0-rc2
[6] https://github.com/apache/flink-web/pull/302
[7] https://github.com/apache/flink-web/pull/301


Re: [Discussion] Job generation / submission hooks & Atlas integration

2020-02-05 Thread Gyula Fóra
@Till Rohrmann 
You are completely right that the Atlas hook itself should not live inside
Flink. All other hooks for the other projects are implemented as part of
Atlas,
and the Atlas community is ready to maintain it once we have a working
version. The discussion is more about changes that we need in Flink (if
any) to make it possible to implement the Atlas hook outside Flink.

So in theory I agree that the hook should receive job submissions and just
extract the metadata required by Atlas.

There are 2 questions here (and my initial email gives one possible
solution):

1. What is the component that receives the submission and runs the
extraction logic? If you want to remove this process from Flink you could
put something in front of the job submission rest endpoint but I think that
would be an overkill. So I assumed that we will have to add some way of
executing code on job submissions, hence my proposal of a job submission
hook.

2. What information do we need to extract the atlas metadata? On job
submissions we usually have JobGraph instances which are pretty hard to
handle compared to a StreamGraph for instance when it comes to getting
source/sink udfs. I am wondering if we need to make this easier somehow.

Gyula

On Wed, Feb 5, 2020 at 6:03 PM Taher Koitawala  wrote:

> As far as I know, Atlas entries can be created with a rest call. Can we not
> create an abstracted Flink operator that makes the rest call on job
> execution/submission?
>
> Regards,
> Taher Koitawala
>
> On Wed, Feb 5, 2020, 10:16 PM Flavio Pompermaier 
> wrote:
>
> > Hi Gyula,
> > thanks for taking care of integrating Flink with Atlas (and Egeria
> > initiative in the end) that is IMHO the most important part of all the
> > Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
> > confirm that the integration with Atlas/Egeria is absolutely of big
> > interest.
> >
> > Il Mer 5 Feb 2020, 17:12 Till Rohrmann  ha
> scritto:
> >
> > > Hi Gyula,
> > >
> > > thanks for starting this discussion. Before diving in the details of
> how
> > to
> > > implement this feature, I wanted to ask whether it is strictly required
> > > that the Atlas integration lives within Flink or not? Could it also
> work
> > if
> > > you have tool which receives job submissions, extracts the required
> > > information, forwards the job submission to Flink, monitors the
> execution
> > > result and finally publishes some information to Atlas (modulo some
> other
> > > steps which are missing in my description)? Having a different layer
> > being
> > > responsible for this would keep complexity out of Flink.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra  wrote:
> > >
> > > > Hi all!
> > > >
> > > > We have started some preliminary work on the Flink - Atlas
> integration
> > at
> > > > Cloudera. It seems that the integration will require some new hook
> > > > interfaces at the jobgraph generation and submission phases, so I
> > > figured I
> > > > will open a discussion thread with my initial ideas to get some early
> > > > feedback.
> > > >
> > > > *Minimal background*
> > > > Very simply put Apache Atlas is a data governance framework that
> stores
> > > > metadata for our data and processing logic to track ownership,
> lineage
> > > etc.
> > > > It is already integrated with systems like HDFS, Kafka, Hive and many
> > > > others.
> > > >
> > > > Adding Flink integration would mean that we can track the input
> output
> > > data
> > > > of our Flink jobs, their owners and how different Flink jobs are
> > > connected
> > > > to each other through the data they produce (lineage). This seems to
> > be a
> > > > very big deal for a lot of companies :)
> > > >
> > > > *Flink - Atlas integration in a nutshell*
> > > > In order to integrate with Atlas we basically need 2 things.
> > > >  - Flink entity definitions
> > > >  - Flink Atlas hook
> > > >
> > > > The entity definition is the easy part. It is a json that contains
> the
> > > > objects (entities) that we want to store for any give Flink job. As a
> > > > starter we could have a single FlinkApplication entity that has a set
> > of
> > > > inputs and outputs. These inputs/outputs are other Atlas entities
> that
> > > are
> > > > already defines such as Kafka topic or Hbase table.
> > > >
> > > > The Flink atlas hook will be the logic that creates the entity
> instance
> > > and
> > > > uploads it to Atlas when we start a new Flink job. This is the part
> > where
> > > > we implement the core logic.
> > > >
> > > > *Job submission hook*
> > > > In order to implement the Atlas hook we need a place where we can
> > inspect
> > > > the pipeline, create and send the metadata when the job starts. When
> we
> > > > create the FlinkApplication entity we need to be able to easily
> > determine
> > > > the sources and sinks (and their properties) of the pipeline.
> > > >
> > > > Unfortunately there is no JobSubmission hook in Flink that could
> > execute
> > > > this 

Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Stephan Ewen
Should we make these a blocker? I am not sure - we could also clearly state
in the release notes how to restore the old behavior, if your setup assumes
that behavior.

Release candidates for this release have been out since mid December, it is
a bit unfortunate that these things have been raised so late.
Having these rather open ended tickets (how to re-define the existing
metrics in the new scheduler/failover handling) now as release blockers
would mean that 1.10 is indefinitely delayed.

Are we sure we want to do that?

On Wed, Feb 5, 2020 at 6:53 PM Thomas Weise  wrote:

> Hi Gary,
>
> Thanks for the clarification!
>
> When we upgrade to a new Flink release, we don't start with a default
> flink-conf.yaml but upgrade our existing tooling and configuration.
> Therefore we notice this issue as part of the upgrade to 1.10, and not when
> we upgraded to 1.9.
>
> I would expect many other users to be in the same camp, and therefore
> consider making these regressions a blocker for 1.10?
>
> Thanks,
> Thomas
>
>
> On Wed, Feb 5, 2020 at 4:53 AM Gary Yao  wrote:
>
> > > also notice that the exception causing a restart is no longer displayed
> > > in the UI, which is probably related?
> >
> > Yes, this is also related to the new scheduler. I created FLINK-15917 [1]
> > to
> > track this. Moreover, I created a ticket about the uptime metric not
> > resetting
> > [2]. Both issues already exist in 1.9 if
> > "jobmanager.execution.failover-strategy" is set to "region", which is the
> > case
> > in the default flink-conf.yaml.
> >
> > In 1.9, unsetting "jobmanager.execution.failover-strategy" was enough to
> > fall
> > back to the previous behavior.
> >
> > In 1.10, you can still fall back to the previous behavior by setting
> > "jobmanager.scheduler: legacy" and unsetting
> > "jobmanager.execution.failover-strategy" in your flink-conf.yaml
> >
> > I would not consider these issues blockers since there is a workaround
> for
> > them, but of course we would like to see the new scheduler getting some
> > production exposure. More detailed release notes about the caveats of the
> > new
> > scheduler will be added to the user documentation.
> >
> >
> > > The watermark issue was
> > https://issues.apache.org/jira/browse/FLINK-14470
> >
> > This should be fixed now [3].
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-15917
> > [2] https://issues.apache.org/jira/browse/FLINK-15918
> > [3] https://issues.apache.org/jira/browse/FLINK-8949
> >
> > On Wed, Feb 5, 2020 at 7:04 AM Thomas Weise  wrote:
> >
> >> Hi Gary,
> >>
> >> Thanks for the reply.
> >>
> >> -->
> >>
> >> On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:
> >>
> >> > Hi Thomas,
> >> >
> >> > > 2) Was there a change in how job recovery reflects in the uptime
> >> metric?
> >> > > Didn't uptime previously reset to 0 on recovery (now it just keeps
> >> > > increasing)?
> >> >
> >> > The uptime is the difference between the current time and the time
> when
> >> the
> >> > job transitioned to RUNNING state. By default we no longer transition
> >> the
> >> > job
> >> > out of the RUNNING state when restarting. This has something to do
> with
> >> the
> >> > new scheduler which enables pipelined region failover by default [1].
> >> > Actually
> >> > we enabled pipelined region failover already in the binary
> distribution
> >> of
> >> > Flink 1.9 by setting:
> >> >
> >> > jobmanager.execution.failover-strategy: region
> >> >
> >> > in the default flink-conf.yaml. Unless you have removed this config
> >> option
> >> > or
> >> > you are using a custom yaml, you should be seeing this behavior in
> Flink
> >> > 1.9.
> >> > If you do not want region failover, set
> >> >
> >> > jobmanager.execution.failover-strategy: full
> >> >
> >> >
> >> We are using the default (the jobmanager.execution.failover-strategy
> >> setting is not present in our flink config).
> >>
> >> The change in behavior I see is between the 1.9 based deployment and the
> >> 1.10 RC.
> >>
> >> Our 1.9 branch is here:
> >> https://github.com/lyft/flink/tree/release-1.9-lyft
> >>
> >> I also notice that the exception causing a restart is no longer
> displayed
> >> in the UI, which is probably related?
> >>
> >>
> >> >
> >> > > 1) Is the low watermark display in the UI still broken?
> >> >
> >> > I was not aware that this is broken. Is there an issue tracking this
> >> bug?
> >> >
> >>
> >> The watermark issue was
> https://issues.apache.org/jira/browse/FLINK-14470
> >>
> >> (I don't have a good way to verify it is fixed at the moment.)
> >>
> >> Another problem with this 1.10 RC is that the checkpointAlignmentTime
> >> metric is missing. (I have not been able to investigate this further
> yet.)
> >>
> >>
> >> >
> >> > Best,
> >> > Gary
> >> >
> >> > [1] https://issues.apache.org/jira/browse/FLINK-14651
> >> >
> >> > On Tue, Feb 4, 2020 at 2:56 AM Thomas Weise  wrote:
> >> >
> >> >> I opened a PR for FLINK-15868
> >> >> :
> >> >> 

Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Thomas Weise
Hi Gary,

Thanks for the clarification!

When we upgrade to a new Flink release, we don't start with a default
flink-conf.yaml but upgrade our existing tooling and configuration.
Therefore we notice this issue as part of the upgrade to 1.10, and not when
we upgraded to 1.9.

I would expect many other users to be in the same camp, and therefore
consider making these regressions a blocker for 1.10?

Thanks,
Thomas


On Wed, Feb 5, 2020 at 4:53 AM Gary Yao  wrote:

> > also notice that the exception causing a restart is no longer displayed
> > in the UI, which is probably related?
>
> Yes, this is also related to the new scheduler. I created FLINK-15917 [1]
> to
> track this. Moreover, I created a ticket about the uptime metric not
> resetting
> [2]. Both issues already exist in 1.9 if
> "jobmanager.execution.failover-strategy" is set to "region", which is the
> case
> in the default flink-conf.yaml.
>
> In 1.9, unsetting "jobmanager.execution.failover-strategy" was enough to
> fall
> back to the previous behavior.
>
> In 1.10, you can still fall back to the previous behavior by setting
> "jobmanager.scheduler: legacy" and unsetting
> "jobmanager.execution.failover-strategy" in your flink-conf.yaml
>
> I would not consider these issues blockers since there is a workaround for
> them, but of course we would like to see the new scheduler getting some
> production exposure. More detailed release notes about the caveats of the
> new
> scheduler will be added to the user documentation.
>
>
> > The watermark issue was
> https://issues.apache.org/jira/browse/FLINK-14470
>
> This should be fixed now [3].
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-15917
> [2] https://issues.apache.org/jira/browse/FLINK-15918
> [3] https://issues.apache.org/jira/browse/FLINK-8949
>
> On Wed, Feb 5, 2020 at 7:04 AM Thomas Weise  wrote:
>
>> Hi Gary,
>>
>> Thanks for the reply.
>>
>> -->
>>
>> On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:
>>
>> > Hi Thomas,
>> >
>> > > 2) Was there a change in how job recovery reflects in the uptime
>> metric?
>> > > Didn't uptime previously reset to 0 on recovery (now it just keeps
>> > > increasing)?
>> >
>> > The uptime is the difference between the current time and the time when
>> the
>> > job transitioned to RUNNING state. By default we no longer transition
>> the
>> > job
>> > out of the RUNNING state when restarting. This has something to do with
>> the
>> > new scheduler which enables pipelined region failover by default [1].
>> > Actually
>> > we enabled pipelined region failover already in the binary distribution
>> of
>> > Flink 1.9 by setting:
>> >
>> > jobmanager.execution.failover-strategy: region
>> >
>> > in the default flink-conf.yaml. Unless you have removed this config
>> option
>> > or
>> > you are using a custom yaml, you should be seeing this behavior in Flink
>> > 1.9.
>> > If you do not want region failover, set
>> >
>> > jobmanager.execution.failover-strategy: full
>> >
>> >
>> We are using the default (the jobmanager.execution.failover-strategy
>> setting is not present in our flink config).
>>
>> The change in behavior I see is between the 1.9 based deployment and the
>> 1.10 RC.
>>
>> Our 1.9 branch is here:
>> https://github.com/lyft/flink/tree/release-1.9-lyft
>>
>> I also notice that the exception causing a restart is no longer displayed
>> in the UI, which is probably related?
>>
>>
>> >
>> > > 1) Is the low watermark display in the UI still broken?
>> >
>> > I was not aware that this is broken. Is there an issue tracking this
>> bug?
>> >
>>
>> The watermark issue was https://issues.apache.org/jira/browse/FLINK-14470
>>
>> (I don't have a good way to verify it is fixed at the moment.)
>>
>> Another problem with this 1.10 RC is that the checkpointAlignmentTime
>> metric is missing. (I have not been able to investigate this further yet.)
>>
>>
>> >
>> > Best,
>> > Gary
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-14651
>> >
>> > On Tue, Feb 4, 2020 at 2:56 AM Thomas Weise  wrote:
>> >
>> >> I opened a PR for FLINK-15868
>> >> :
>> >> https://github.com/apache/flink/pull/11006
>> >>
>> >> With that change, I was able to run an application that consumes from
>> >> Kinesis.
>> >>
>> >> I should have data tomorrow regarding the performance.
>> >>
>> >> Two questions/observations:
>> >>
>> >> 1) Is the low watermark display in the UI still broken?
>> >> 2) Was there a change in how job recovery reflects in the uptime
>> metric?
>> >> Didn't uptime previously reset to 0 on recovery (now it just keeps
>> >> increasing)?
>> >>
>> >> Thanks,
>> >> Thomas
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Feb 3, 2020 at 10:55 AM Thomas Weise  wrote:
>> >>
>> >> > I found another issue with the Kinesis connector:
>> >> >
>> >> > https://issues.apache.org/jira/browse/FLINK-15868
>> >> >
>> >> >
>> >> > On Mon, Feb 3, 2020 at 3:35 AM Gary Yao  wrote:
>> >> >
>> >> >> Hi everyone,
>> >> >>
>> >> >> I am hereby 

Re: [Discussion] Job generation / submission hooks & Atlas integration

2020-02-05 Thread Taher Koitawala
As far as I know, Atlas entries can be created with a rest call. Can we not
create an abstracted Flink operator that makes the rest call on job
execution/submission?

Regards,
Taher Koitawala

On Wed, Feb 5, 2020, 10:16 PM Flavio Pompermaier 
wrote:

> Hi Gyula,
> thanks for taking care of integrating Flink with Atlas (and Egeria
> initiative in the end) that is IMHO the most important part of all the
> Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
> confirm that the integration with Atlas/Egeria is absolutely of big
> interest.
>
> Il Mer 5 Feb 2020, 17:12 Till Rohrmann  ha scritto:
>
> > Hi Gyula,
> >
> > thanks for starting this discussion. Before diving in the details of how
> to
> > implement this feature, I wanted to ask whether it is strictly required
> > that the Atlas integration lives within Flink or not? Could it also work
> if
> > you have tool which receives job submissions, extracts the required
> > information, forwards the job submission to Flink, monitors the execution
> > result and finally publishes some information to Atlas (modulo some other
> > steps which are missing in my description)? Having a different layer
> being
> > responsible for this would keep complexity out of Flink.
> >
> > Cheers,
> > Till
> >
> > On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra  wrote:
> >
> > > Hi all!
> > >
> > > We have started some preliminary work on the Flink - Atlas integration
> at
> > > Cloudera. It seems that the integration will require some new hook
> > > interfaces at the jobgraph generation and submission phases, so I
> > figured I
> > > will open a discussion thread with my initial ideas to get some early
> > > feedback.
> > >
> > > *Minimal background*
> > > Very simply put Apache Atlas is a data governance framework that stores
> > > metadata for our data and processing logic to track ownership, lineage
> > etc.
> > > It is already integrated with systems like HDFS, Kafka, Hive and many
> > > others.
> > >
> > > Adding Flink integration would mean that we can track the input output
> > data
> > > of our Flink jobs, their owners and how different Flink jobs are
> > connected
> > > to each other through the data they produce (lineage). This seems to
> be a
> > > very big deal for a lot of companies :)
> > >
> > > *Flink - Atlas integration in a nutshell*
> > > In order to integrate with Atlas we basically need 2 things.
> > >  - Flink entity definitions
> > >  - Flink Atlas hook
> > >
> > > The entity definition is the easy part. It is a json that contains the
> > > objects (entities) that we want to store for any give Flink job. As a
> > > starter we could have a single FlinkApplication entity that has a set
> of
> > > inputs and outputs. These inputs/outputs are other Atlas entities that
> > are
> > > already defines such as Kafka topic or Hbase table.
> > >
> > > The Flink atlas hook will be the logic that creates the entity instance
> > and
> > > uploads it to Atlas when we start a new Flink job. This is the part
> where
> > > we implement the core logic.
> > >
> > > *Job submission hook*
> > > In order to implement the Atlas hook we need a place where we can
> inspect
> > > the pipeline, create and send the metadata when the job starts. When we
> > > create the FlinkApplication entity we need to be able to easily
> determine
> > > the sources and sinks (and their properties) of the pipeline.
> > >
> > > Unfortunately there is no JobSubmission hook in Flink that could
> execute
> > > this logic and even if there was one there is a mismatch of abstraction
> > > levels needed to implement the integration.
> > > We could imagine a JobSubmission hook executed in the JobManager runner
> > as
> > > this:
> > >
> > > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > > configuration);
> > >
> > > This is nice but the JobGraph makes it super difficult to extract
> sources
> > > and UDFs to create the metadata entity. The atlas entity however could
> be
> > > easily created from the StreamGraph object (used to represent the
> logical
> > > flow) before the JobGraph is generated. To go around this limitation we
> > > could add a JobGraphGeneratorHook interface:
> > >
> > > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > > jobGraph);
> > >
> > > We could then generate the atlas entity in the preprocess step and add
> a
> > > jobmission hook in the postprocess step that will simply send the
> already
> > > baked in entity.
> > >
> > > *This kinda works but...*
> > > The approach outlined above seems to work and we have built a POC using
> > it.
> > > Unfortunately it is far from nice as it exposes non-public APIs such as
> > the
> > > StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
> > >
> > > It would be much nicer if we could somehow go back from JobGraph to
> > > StreamGraph or at least have an easy way to access source/sink UDFS.
> > >
> > > What do you think?
> > >
> > > Cheers,
> > > Gyula
> > >
> >
>


Re: [Discussion] Job generation / submission hooks & Atlas integration

2020-02-05 Thread Flavio Pompermaier
Hi Gyula,
thanks for taking care of integrating Flink with Atlas (and Egeria
initiative in the end) that is IMHO the most important part of all the
Hadoop ecosystem and that, unfortunately, was quite overlooked. I can
confirm that the integration with Atlas/Egeria is absolutely of big
interest.

Il Mer 5 Feb 2020, 17:12 Till Rohrmann  ha scritto:

> Hi Gyula,
>
> thanks for starting this discussion. Before diving in the details of how to
> implement this feature, I wanted to ask whether it is strictly required
> that the Atlas integration lives within Flink or not? Could it also work if
> you have tool which receives job submissions, extracts the required
> information, forwards the job submission to Flink, monitors the execution
> result and finally publishes some information to Atlas (modulo some other
> steps which are missing in my description)? Having a different layer being
> responsible for this would keep complexity out of Flink.
>
> Cheers,
> Till
>
> On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra  wrote:
>
> > Hi all!
> >
> > We have started some preliminary work on the Flink - Atlas integration at
> > Cloudera. It seems that the integration will require some new hook
> > interfaces at the jobgraph generation and submission phases, so I
> figured I
> > will open a discussion thread with my initial ideas to get some early
> > feedback.
> >
> > *Minimal background*
> > Very simply put Apache Atlas is a data governance framework that stores
> > metadata for our data and processing logic to track ownership, lineage
> etc.
> > It is already integrated with systems like HDFS, Kafka, Hive and many
> > others.
> >
> > Adding Flink integration would mean that we can track the input output
> data
> > of our Flink jobs, their owners and how different Flink jobs are
> connected
> > to each other through the data they produce (lineage). This seems to be a
> > very big deal for a lot of companies :)
> >
> > *Flink - Atlas integration in a nutshell*
> > In order to integrate with Atlas we basically need 2 things.
> >  - Flink entity definitions
> >  - Flink Atlas hook
> >
> > The entity definition is the easy part. It is a json that contains the
> > objects (entities) that we want to store for any give Flink job. As a
> > starter we could have a single FlinkApplication entity that has a set of
> > inputs and outputs. These inputs/outputs are other Atlas entities that
> are
> > already defines such as Kafka topic or Hbase table.
> >
> > The Flink atlas hook will be the logic that creates the entity instance
> and
> > uploads it to Atlas when we start a new Flink job. This is the part where
> > we implement the core logic.
> >
> > *Job submission hook*
> > In order to implement the Atlas hook we need a place where we can inspect
> > the pipeline, create and send the metadata when the job starts. When we
> > create the FlinkApplication entity we need to be able to easily determine
> > the sources and sinks (and their properties) of the pipeline.
> >
> > Unfortunately there is no JobSubmission hook in Flink that could execute
> > this logic and even if there was one there is a mismatch of abstraction
> > levels needed to implement the integration.
> > We could imagine a JobSubmission hook executed in the JobManager runner
> as
> > this:
> >
> > void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> > configuration);
> >
> > This is nice but the JobGraph makes it super difficult to extract sources
> > and UDFs to create the metadata entity. The atlas entity however could be
> > easily created from the StreamGraph object (used to represent the logical
> > flow) before the JobGraph is generated. To go around this limitation we
> > could add a JobGraphGeneratorHook interface:
> >
> > void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> > jobGraph);
> >
> > We could then generate the atlas entity in the preprocess step and add a
> > jobmission hook in the postprocess step that will simply send the already
> > baked in entity.
> >
> > *This kinda works but...*
> > The approach outlined above seems to work and we have built a POC using
> it.
> > Unfortunately it is far from nice as it exposes non-public APIs such as
> the
> > StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
> >
> > It would be much nicer if we could somehow go back from JobGraph to
> > StreamGraph or at least have an easy way to access source/sink UDFS.
> >
> > What do you think?
> >
> > Cheers,
> > Gyula
> >
>


Re: [DISCUSS] Release flink-shaded 10.0

2020-02-05 Thread Till Rohrmann
Thanks for starting this discussion Chesnay. +1 for starting a new
flink-shaded release.

Cheers,
Till

On Wed, Feb 5, 2020 at 2:10 PM Chesnay Schepler  wrote:

> Hello,
>
> I would like to kick off the next release of flink-shaded. The main
> feature are new modules that bundle zookeeper, that will allow
> us to support zk 3.4 and 3.5 .
>
> Additionally we fixed an issue where slightly older dependencies than
> intended were bundled in the flink-shaded-hadoop-2-uber jar, which was
> flagged by security checks.
>
> Are there any other changes that people are interested in doing?
>
>
> Regards,
>
> Chesnay
>
>


Re: [Discussion] Job generation / submission hooks & Atlas integration

2020-02-05 Thread Till Rohrmann
Hi Gyula,

thanks for starting this discussion. Before diving in the details of how to
implement this feature, I wanted to ask whether it is strictly required
that the Atlas integration lives within Flink or not? Could it also work if
you have tool which receives job submissions, extracts the required
information, forwards the job submission to Flink, monitors the execution
result and finally publishes some information to Atlas (modulo some other
steps which are missing in my description)? Having a different layer being
responsible for this would keep complexity out of Flink.

Cheers,
Till

On Wed, Feb 5, 2020 at 12:48 PM Gyula Fóra  wrote:

> Hi all!
>
> We have started some preliminary work on the Flink - Atlas integration at
> Cloudera. It seems that the integration will require some new hook
> interfaces at the jobgraph generation and submission phases, so I figured I
> will open a discussion thread with my initial ideas to get some early
> feedback.
>
> *Minimal background*
> Very simply put Apache Atlas is a data governance framework that stores
> metadata for our data and processing logic to track ownership, lineage etc.
> It is already integrated with systems like HDFS, Kafka, Hive and many
> others.
>
> Adding Flink integration would mean that we can track the input output data
> of our Flink jobs, their owners and how different Flink jobs are connected
> to each other through the data they produce (lineage). This seems to be a
> very big deal for a lot of companies :)
>
> *Flink - Atlas integration in a nutshell*
> In order to integrate with Atlas we basically need 2 things.
>  - Flink entity definitions
>  - Flink Atlas hook
>
> The entity definition is the easy part. It is a json that contains the
> objects (entities) that we want to store for any give Flink job. As a
> starter we could have a single FlinkApplication entity that has a set of
> inputs and outputs. These inputs/outputs are other Atlas entities that are
> already defines such as Kafka topic or Hbase table.
>
> The Flink atlas hook will be the logic that creates the entity instance and
> uploads it to Atlas when we start a new Flink job. This is the part where
> we implement the core logic.
>
> *Job submission hook*
> In order to implement the Atlas hook we need a place where we can inspect
> the pipeline, create and send the metadata when the job starts. When we
> create the FlinkApplication entity we need to be able to easily determine
> the sources and sinks (and their properties) of the pipeline.
>
> Unfortunately there is no JobSubmission hook in Flink that could execute
> this logic and even if there was one there is a mismatch of abstraction
> levels needed to implement the integration.
> We could imagine a JobSubmission hook executed in the JobManager runner as
> this:
>
> void onSuccessfulSubmission(JobGraph jobGraph, Configuration
> configuration);
>
> This is nice but the JobGraph makes it super difficult to extract sources
> and UDFs to create the metadata entity. The atlas entity however could be
> easily created from the StreamGraph object (used to represent the logical
> flow) before the JobGraph is generated. To go around this limitation we
> could add a JobGraphGeneratorHook interface:
>
> void preProcess(StreamGraph streamGraph); void postProcess(JobGraph
> jobGraph);
>
> We could then generate the atlas entity in the preprocess step and add a
> jobmission hook in the postprocess step that will simply send the already
> baked in entity.
>
> *This kinda works but...*
> The approach outlined above seems to work and we have built a POC using it.
> Unfortunately it is far from nice as it exposes non-public APIs such as the
> StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.
>
> It would be much nicer if we could somehow go back from JobGraph to
> StreamGraph or at least have an easy way to access source/sink UDFS.
>
> What do you think?
>
> Cheers,
> Gyula
>


[jira] [Created] (FLINK-15926) Add DataStream.broadcast(StateDescriptor) to available transformations docs

2020-02-05 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-15926:
--

 Summary: Add DataStream.broadcast(StateDescriptor) to available 
transformations docs
 Key: FLINK-15926
 URL: https://issues.apache.org/jira/browse/FLINK-15926
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Documentation
Affects Versions: 1.10.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15925) TaskExecutors don't work out-of-the-box on Windows

2020-02-05 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-15925:


 Summary: TaskExecutors don't work out-of-the-box on Windows
 Key: FLINK-15925
 URL: https://issues.apache.org/jira/browse/FLINK-15925
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Affects Versions: 1.10.0
Reporter: Chesnay Schepler


{code}
org.apache.flink.configuration.IllegalConfigurationException: Failed to create 
TaskExecutorResourceSpec
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:72)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:356)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:152)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:308)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$2(TaskManagerRunner.java:322)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:321)
 [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:287)
 [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
required configuration option Key: 'taskmanager.cpu.cores' , default: null 
(fallback keys: []) is not set
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkConfigOptionIsSet(TaskExecutorResourceUtils.java:90)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.lambda$checkTaskExecutorResourceConfigSet$0(TaskExecutorResourceUtils.java:84)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at java.util.Arrays$ArrayList.forEach(Arrays.java:4390) ~[?:?]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.checkTaskExecutorResourceConfigSet(TaskExecutorResourceUtils.java:84)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:70)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
... 7 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15924) Detect and log blocking main thread operations

2020-02-05 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-15924:
-

 Summary: Detect and log blocking main thread operations
 Key: FLINK-15924
 URL: https://issues.apache.org/jira/browse/FLINK-15924
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Till Rohrmann
 Fix For: 1.11.0


When using the {{RpcEndpoint}} it is important that all operations which run on 
the main thread are never blocking. We have seen in the past that it is quite 
hard to always catch blocking operations in reviews and sometimes these changes 
caused instabilities in Flink. Once this happens it is not trivial to find the 
culprit which is responsible for the blocking operation.

One way to make debugging easier is to add a monitor which detects and logs if 
a {{RpcEndpoint}} operation takes longer than {{n}} seconds for example. 
Depending on the overhead of this monitor one could even think about enabling 
it only via a special configuration (e.g. debug mode).

A proper class to introduce this monitor could be the {{AkkRpcActor}} which is 
responsible for executing main thread operations. Whenever we schedule an 
operation, we could start a timeout which if triggered and the operation has 
not been completed will log a warning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #0

2020-02-05 Thread Chesnay Schepler

-1

- this is not a source release by definition, since a source release 
must not contain binaries. This is a convenience binary, or possibly 
even a distributed-channel appropriate version of our existing 
convenience binary. A user downloading this package should know what 
they are downloading.
We have never released a binary without a corresponding source release, 
and don't really have established processes for this nor for 
distribution channels other than maven. Technically speaking we don't 
require a vote, but it is something that the PMC has to decide.


- the artifact name is not descriptive as it neither says that it is a 
binary nor that it is a python/PyPi-specific release


- Development status classifier seems incorrect as it is set to "Planning"


On 05/02/2020 09:03, jincheng sun wrote:

Hi Wei,

Thanks for your vote and I appreciate that you kindly help to take the
ticket.

  I've assigned the JIRAs to you!

Best,
Jincheng


Wei Zhong  于2020年2月5日周三 下午3:55写道:


Hi,

Thanks for driving this, Jincheng.

+1 (non-binding)

- Verified signatures and checksums.
- `pip install apache-flink-1.9.2.tar.gz` successfully.
- Start local pyflink shell via `pyflink-shell.sh local` and try the
examples in the help message, run well and no exception.
- Try a word count example in IDE, run well and no exception.

In addition I'm willing to take these JIRAs. Could you assign them to me?
:)

Best,
Wei



在 2020年2月5日,14:49,jincheng sun  写道:

Hi everyone,

Please review and vote on the release candidate #0 for the PyFlink

version

1.9.2, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
* the official Apache source release and binary convenience releases to

be

deployed to dist.apache.org [1], which are signed with the key with
fingerprint 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2],
* source code tag "release-1.9.2" [3],
* create JIRA. for add description of support 'pip install' to 1.9.x
documents[4]
* create JIRA. for add PyPI release process for subsequent version

release

of 1.9.x . i.e. improve the script of `create-binary-release. sh`.[5]

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Jincheng

[1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc0/
[2] https://dist.apache.org/repos/dist/release/flink/KEYS
[3] https://github.com/apache/flink/tree/release-1.9.2
[4] https://issues.apache.org/jira/browse/FLINK-15908
[5] https://issues.apache.org/jira/browse/FLINK-15909






Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2020-02-05 Thread Stephan Ewen
Some thoughts about other options we have:

  - Put fat/shaded jars for the common versions into "flink-shaded" and
offer them for download on the website, similar to pre-bundles Hadoop
versions.

  - Look at the Presto code (Metastore protocol) and see if we can reuse
that

  - Have a setup helper script that takes the versions and pulls the
required dependencies.

Can you share how can a "built-in" dependency could work, if there are so
many different conflicting versions?

Thanks,
Stephan


On Tue, Feb 4, 2020 at 12:59 PM Rui Li  wrote:

> Hi Stephan,
>
> As Jingsong stated, in our documentation the recommended way to add Hive
> deps is to use exactly what users have installed. It's just we ask users to
> manually add those jars, instead of automatically find them based on env
> variables. I prefer to keep it this way for a while, and see if there're
> real concerns/complaints from user feedbacks.
>
> Please also note the Hive jars are not the only ones needed to integrate
> with Hive, users have to make sure flink-connector-hive and Hadoop jars are
> in classpath too. So I'm afraid a single "HIVE" env variable wouldn't save
> all the manual work for our users.
>
> On Tue, Feb 4, 2020 at 5:54 PM Jingsong Li  wrote:
>
> > Hi all,
> >
> > For your information, we have document the dependencies detailed
> > information [1]. I think it's a lot clearer than before, but it's worse
> > than presto and spark (they avoid or have built-in hive dependency).
> >
> > I thought about Stephan's suggestion:
> > - The hive/lib has 200+ jars, but we only need hive-exec.jar or plus two
> > or three jars, if so many jars are introduced, maybe will there be a big
> > conflict.
> > - And hive/lib is not available on every machine. We need to upload so
> > many jars.
> > - A separate classloader maybe hard to work too, our flink-connector-hive
> > need hive jars, we may need to deal with flink-connector-hive jar spacial
> > too.
> > CC: Rui Li
> >
> > I think the best system to integrate with hive is presto, which only
> > connects hive metastore through thrift protocol. But I understand that it
> > costs a lot to rewrite the code.
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Feb 4, 2020 at 1:44 AM Stephan Ewen  wrote:
> >
> >> We have had much trouble in the past from "too deep too custom"
> >> integrations that everyone got out of the box, i.e., Hadoop.
> >> Flink has has such a broad spectrum of use cases, if we have custom
> build
> >> for every other framework in that spectrum, we'll be in trouble.
> >>
> >> So I would also be -1 for custom builds.
> >>
> >> Couldn't we do something similar as we started doing for Hadoop? Moving
> >> away from convenience downloads to allowing users to "export" their
> setup
> >> for Flink?
> >>
> >>   - We can have a "hive module (loader)" in flink/lib by default
> >>   - The module loader would look for an environment variable like
> >> "HIVE_CLASSPATH" and load these classes (ideally in a separate
> >> classloader).
> >>   - The loader can search for certain classes and instantiate catalog /
> >> functions / etc. when finding them instantiates the hive module
> >> referencing
> >> them
> >>   - That way, we use exactly what users have installed, without needing
> to
> >> build our own bundles.
> >>
> >> Could that work?
> >>
> >> Best,
> >> Stephan
> >>
> >>
> >> On Wed, Dec 18, 2019 at 9:43 AM Till Rohrmann 
> >> wrote:
> >>
> >> > Couldn't it simply be documented which jars are in the convenience
> jars
> >> > which are pre built and can be downloaded from the website? Then
> people
> >> who
> >> > need a custom version know which jars they need to provide to Flink?
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Tue, Dec 17, 2019 at 6:49 PM Bowen Li  wrote:
> >> >
> >> > > I'm not sure providing an uber jar would be possible.
> >> > >
> >> > > Different from kafka and elasticsearch connector who have
> dependencies
> >> > for
> >> > > a specific kafka/elastic version, or the kafka universal connector
> >> that
> >> > > provides good compatibilities, hive connector needs to deal with
> hive
> >> > jars
> >> > > in all 1.x, 2.x, 3.x versions (let alone all the HDP/CDH
> >> distributions)
> >> > > with incompatibility even between minor versions, different
> versioned
> >> > > hadoop and other extra dependency jars for each hive version.
> >> > >
> >> > > Besides, users usually need to be able to easily see which
> individual
> >> > jars
> >> > > are required, which is invisible from an uber jar. Hive users
> already
> >> > have
> >> > > their hive deployments. They usually have to use their own hive jars
> >> > > because, unlike hive jars on mvn, their own jars contain changes
> >> in-house
> >> > > or from vendors. They need to easily tell which jars Flink requires
> >> for
> >> > > corresponding open sourced hive version to their own hive
> deployment,
> >> and
> >> > > copy 

[jira] [Created] (FLINK-15923) Remove DISCARDED in TaskAcknowledgeResult

2020-02-05 Thread Jiayi Liao (Jira)
Jiayi Liao created FLINK-15923:
--

 Summary: Remove DISCARDED in TaskAcknowledgeResult
 Key: FLINK-15923
 URL: https://issues.apache.org/jira/browse/FLINK-15923
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.9.2
Reporter: Jiayi Liao


{{TaskAcknowledgeResult.DISCARDED}} is returned only when the checkpoint is 
discarded and removed from the {{pendingCheckpoints}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Release flink-shaded 10.0

2020-02-05 Thread Chesnay Schepler

Hello,

I would like to kick off the next release of flink-shaded. The main 
feature are new modules that bundle zookeeper, that will allow 
us to support zk 3.4 and 3.5 .


Additionally we fixed an issue where slightly older dependencies than 
intended were bundled in the flink-shaded-hadoop-2-uber jar, which was 
flagged by security checks.


Are there any other changes that people are interested in doing?


Regards,

Chesnay



Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Gary Yao
> also notice that the exception causing a restart is no longer displayed
> in the UI, which is probably related?

Yes, this is also related to the new scheduler. I created FLINK-15917 [1] to
track this. Moreover, I created a ticket about the uptime metric not
resetting
[2]. Both issues already exist in 1.9 if
"jobmanager.execution.failover-strategy" is set to "region", which is the
case
in the default flink-conf.yaml.

In 1.9, unsetting "jobmanager.execution.failover-strategy" was enough to
fall
back to the previous behavior.

In 1.10, you can still fall back to the previous behavior by setting
"jobmanager.scheduler: legacy" and unsetting
"jobmanager.execution.failover-strategy" in your flink-conf.yaml

I would not consider these issues blockers since there is a workaround for
them, but of course we would like to see the new scheduler getting some
production exposure. More detailed release notes about the caveats of the
new
scheduler will be added to the user documentation.


> The watermark issue was https://issues.apache.org/jira/browse/FLINK-14470

This should be fixed now [3].


[1] https://issues.apache.org/jira/browse/FLINK-15917
[2] https://issues.apache.org/jira/browse/FLINK-15918
[3] https://issues.apache.org/jira/browse/FLINK-8949

On Wed, Feb 5, 2020 at 7:04 AM Thomas Weise  wrote:

> Hi Gary,
>
> Thanks for the reply.
>
> -->
>
> On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:
>
> > Hi Thomas,
> >
> > > 2) Was there a change in how job recovery reflects in the uptime
> metric?
> > > Didn't uptime previously reset to 0 on recovery (now it just keeps
> > > increasing)?
> >
> > The uptime is the difference between the current time and the time when
> the
> > job transitioned to RUNNING state. By default we no longer transition the
> > job
> > out of the RUNNING state when restarting. This has something to do with
> the
> > new scheduler which enables pipelined region failover by default [1].
> > Actually
> > we enabled pipelined region failover already in the binary distribution
> of
> > Flink 1.9 by setting:
> >
> > jobmanager.execution.failover-strategy: region
> >
> > in the default flink-conf.yaml. Unless you have removed this config
> option
> > or
> > you are using a custom yaml, you should be seeing this behavior in Flink
> > 1.9.
> > If you do not want region failover, set
> >
> > jobmanager.execution.failover-strategy: full
> >
> >
> We are using the default (the jobmanager.execution.failover-strategy
> setting is not present in our flink config).
>
> The change in behavior I see is between the 1.9 based deployment and the
> 1.10 RC.
>
> Our 1.9 branch is here:
> https://github.com/lyft/flink/tree/release-1.9-lyft
>
> I also notice that the exception causing a restart is no longer displayed
> in the UI, which is probably related?
>
>
> >
> > > 1) Is the low watermark display in the UI still broken?
> >
> > I was not aware that this is broken. Is there an issue tracking this bug?
> >
>
> The watermark issue was https://issues.apache.org/jira/browse/FLINK-14470
>
> (I don't have a good way to verify it is fixed at the moment.)
>
> Another problem with this 1.10 RC is that the checkpointAlignmentTime
> metric is missing. (I have not been able to investigate this further yet.)
>
>
> >
> > Best,
> > Gary
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-14651
> >
> > On Tue, Feb 4, 2020 at 2:56 AM Thomas Weise  wrote:
> >
> >> I opened a PR for FLINK-15868
> >> :
> >> https://github.com/apache/flink/pull/11006
> >>
> >> With that change, I was able to run an application that consumes from
> >> Kinesis.
> >>
> >> I should have data tomorrow regarding the performance.
> >>
> >> Two questions/observations:
> >>
> >> 1) Is the low watermark display in the UI still broken?
> >> 2) Was there a change in how job recovery reflects in the uptime metric?
> >> Didn't uptime previously reset to 0 on recovery (now it just keeps
> >> increasing)?
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >>
> >>
> >> On Mon, Feb 3, 2020 at 10:55 AM Thomas Weise  wrote:
> >>
> >> > I found another issue with the Kinesis connector:
> >> >
> >> > https://issues.apache.org/jira/browse/FLINK-15868
> >> >
> >> >
> >> > On Mon, Feb 3, 2020 at 3:35 AM Gary Yao  wrote:
> >> >
> >> >> Hi everyone,
> >> >>
> >> >> I am hereby canceling the vote due to:
> >> >>
> >> >> FLINK-15837
> >> >> FLINK-15840
> >> >>
> >> >> Another RC will be created later today.
> >> >>
> >> >> Best,
> >> >> Gary
> >> >>
> >> >> On Mon, Jan 27, 2020 at 10:06 PM Gary Yao  wrote:
> >> >>
> >> >> > Hi everyone,
> >> >> > Please review and vote on the release candidate #1 for the version
> >> >> 1.10.0,
> >> >> > as follows:
> >> >> > [ ] +1, Approve the release
> >> >> > [ ] -1, Do not approve the release (please provide specific
> comments)
> >> >> >
> >> >> >
> >> >> > The complete staging area is available for your review, which
> >> includes:
> >> >> > * JIRA release notes [1],
> >> 

[jira] [Created] (FLINK-15922) Show "Warn - received late message for checkpoint" only when checkpoint actually expired

2020-02-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-15922:


 Summary: Show "Warn - received late message for checkpoint" only 
when checkpoint actually expired
 Key: FLINK-15922
 URL: https://issues.apache.org/jira/browse/FLINK-15922
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.10.0
Reporter: Stephan Ewen
 Fix For: 1.11.0


The message "Warn - received late message for checkpoint" is shown frequently 
in the logs, also when a checkpoint was purposefully canceled.
In those case, this message is unhelpful and misleading.

We should log this only when the checkpoint is actually expired.

Meaning that when receiving the message, we check if we have an expired 
checkpoint for that ID. If yes, we log that message, if not, we simply drop the 
message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15921) PYTHON exited with EXIT CODE: 143 in travis-ci

2020-02-05 Thread sunjincheng (Jira)
sunjincheng created FLINK-15921:
---

 Summary: PYTHON exited with EXIT CODE: 143 in travis-ci
 Key: FLINK-15921
 URL: https://issues.apache.org/jira/browse/FLINK-15921
 Project: Flink
  Issue Type: Test
  Components: Build System
Affects Versions: 1.10.0, 1.11.0
Reporter: sunjincheng


Currently, some Travis CI failures occur, such as: [1],[2]. The reason for the 
failure is that the python dependent `grpcio` released the latest version 
1.27.0 [3] today, which resulted in the test cache not having the latest 
dependency, and the timeout of downloading in the repo. If the problem will be 
fixed after the first download when the network is in good condition. I am 
still watching the latest build [4]. If it fails for a long time, we will try 
to set a lower version of `grpcio` or optimize the current test case. We will 
watch for a while (until tomorrow).

 

What do you think?

  

[1][https://travis-ci.org/apache/flink/builds/646250268]
[2][https://travis-ci.org/apache/flink/jobs/646281060]

[3] [https://pypi.org/project/grpcio/#files]

[4][https://travis-ci.org/apache/flink/builds/646355253]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-05 Thread Kurt Young
Hi Zhenghua,

After removing TableSource::getTableSchema, during optimization, I could
imagine
the schema information might come from relational nodes such as TableScan.

Best,
Kurt


On Wed, Feb 5, 2020 at 8:24 PM Kurt Young  wrote:

> Hi Jingsong,
>
> Yes current TableFactory is not ideal for users to use either. I think we
> should
> also spend some time in 1.11 to improve the usability of TableEnvironment
> when
> users trying to read or write something. Automatic scheme inference would
> be
> one of them. Other from this, we also support convert a DataStream to
> Table, which
> can serve some flexible requirements to read or write data.
>
> Best,
> Kurt
>
>
> On Wed, Feb 5, 2020 at 7:29 PM Zhenghua Gao  wrote:
>
>> +1 to remove these methods.
>>
>> One concern about invocations of TableSource::getTableSchema:
>> By removing such methods, we can stop calling TableSource::getTableSchema
>> in some place(such
>> as BatchTableEnvImpl/TableEnvironmentImpl#validateTableSource,
>> ConnectorCatalogTable, TableSourceQueryOperation).
>>
>> But in other place we need field types and names of the table source(such
>> as
>> BatchExecLookupJoinRule/StreamExecLookupJoinRule,
>> PushProjectIntoTableSourceScanRule,
>> CommonLookupJoin).  So how should we deal with this?
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Wed, Feb 5, 2020 at 2:36 PM Kurt Young  wrote:
>>
>> > Hi all,
>> >
>> > I'd like to bring up a discussion about removing registration of
>> > TableSource and
>> > TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
>> > affected
>> > method would be:
>> >
>> > TableEnvironment::registerTableSource
>> > TableEnvironment::fromTableSource
>> > TableEnvironment::registerTableSink
>> > ConnectTableDescriptor::registerTableSource
>> > ConnectTableDescriptor::registerTableSink
>> > ConnectTableDescriptor::registerTableSourceAndSink
>> >
>> > (Most of them are already deprecated, except for
>> > TableEnvironment::fromTableSource,
>> > which was intended to deprecate but missed by accident).
>> >
>> > FLIP-64 [1] already explained why we want to deprecate TableSource &
>> > TableSink from
>> > user's interface. In a short word, these interfaces should only read &
>> > write the physical
>> > representation of the table, and they are not fitting well after we
>> already
>> > introduced some
>> > logical table fields such as computed column, watermarks.
>> >
>> > Another reason is the exposure of registerTableSource in Table Env just
>> > make the whole
>> > SQL protocol opposite. TableSource should be used as a reader of table,
>> it
>> > should rely on
>> > other metadata information held by framework, which eventually comes
>> from
>> > DDL or
>> > ConnectDescriptor. But if we register a TableSource to Table Env, we
>> have
>> > no choice but
>> > have to rely on TableSource::getTableSchema. It will make the design
>> > obscure, sometimes
>> > TableSource should trust the information comes from framework, but
>> > sometimes it should
>> > also generate its own schema information.
>> >
>> > Furthermore, if the authority about schema information is not clear, it
>> > will make things much
>> > more complicated if we want to improve the table api usability such as
>> > introducing automatic
>> > schema inference in the near future.
>> >
>> > Since this is an API break change, I've also included user mailing list
>> to
>> > gather more feedbacks.
>> >
>> > Best,
>> > Kurt
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>> >
>>
>


Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-05 Thread Kurt Young
Hi Jingsong,

Yes current TableFactory is not ideal for users to use either. I think we
should
also spend some time in 1.11 to improve the usability of TableEnvironment
when
users trying to read or write something. Automatic scheme inference would
be
one of them. Other from this, we also support convert a DataStream to
Table, which
can serve some flexible requirements to read or write data.

Best,
Kurt


On Wed, Feb 5, 2020 at 7:29 PM Zhenghua Gao  wrote:

> +1 to remove these methods.
>
> One concern about invocations of TableSource::getTableSchema:
> By removing such methods, we can stop calling TableSource::getTableSchema
> in some place(such
> as BatchTableEnvImpl/TableEnvironmentImpl#validateTableSource,
> ConnectorCatalogTable, TableSourceQueryOperation).
>
> But in other place we need field types and names of the table source(such
> as
> BatchExecLookupJoinRule/StreamExecLookupJoinRule,
> PushProjectIntoTableSourceScanRule,
> CommonLookupJoin).  So how should we deal with this?
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Wed, Feb 5, 2020 at 2:36 PM Kurt Young  wrote:
>
> > Hi all,
> >
> > I'd like to bring up a discussion about removing registration of
> > TableSource and
> > TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
> > affected
> > method would be:
> >
> > TableEnvironment::registerTableSource
> > TableEnvironment::fromTableSource
> > TableEnvironment::registerTableSink
> > ConnectTableDescriptor::registerTableSource
> > ConnectTableDescriptor::registerTableSink
> > ConnectTableDescriptor::registerTableSourceAndSink
> >
> > (Most of them are already deprecated, except for
> > TableEnvironment::fromTableSource,
> > which was intended to deprecate but missed by accident).
> >
> > FLIP-64 [1] already explained why we want to deprecate TableSource &
> > TableSink from
> > user's interface. In a short word, these interfaces should only read &
> > write the physical
> > representation of the table, and they are not fitting well after we
> already
> > introduced some
> > logical table fields such as computed column, watermarks.
> >
> > Another reason is the exposure of registerTableSource in Table Env just
> > make the whole
> > SQL protocol opposite. TableSource should be used as a reader of table,
> it
> > should rely on
> > other metadata information held by framework, which eventually comes from
> > DDL or
> > ConnectDescriptor. But if we register a TableSource to Table Env, we have
> > no choice but
> > have to rely on TableSource::getTableSchema. It will make the design
> > obscure, sometimes
> > TableSource should trust the information comes from framework, but
> > sometimes it should
> > also generate its own schema information.
> >
> > Furthermore, if the authority about schema information is not clear, it
> > will make things much
> > more complicated if we want to improve the table api usability such as
> > introducing automatic
> > schema inference in the near future.
> >
> > Since this is an API break change, I've also included user mailing list
> to
> > gather more feedbacks.
> >
> > Best,
> > Kurt
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
> >
>


[jira] [Created] (FLINK-15920) Show thread name in logs on CI

2020-02-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-15920:


 Summary: Show thread name in logs on CI
 Key: FLINK-15920
 URL: https://issues.apache.org/jira/browse/FLINK-15920
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.11.0, 1.10.1


Having thread names in log lines make it much easier to understand from which 
task they come.

Enabling that by default on the CI setup helps with analyzing bugs and unstable 
tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15919) MemoryManager shouldn't allow releasing more memory than reserved

2020-02-05 Thread Yu Li (Jira)
Yu Li created FLINK-15919:
-

 Summary: MemoryManager shouldn't allow releasing more memory than 
reserved
 Key: FLINK-15919
 URL: https://issues.apache.org/jira/browse/FLINK-15919
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.10.0
Reporter: Yu Li


Currently {{MemoryManager}} allows releasing more memory than reserved for an 
owner object, which could be reproduced by adding below test case into 
{{MemoryManagerTest}}:

{code}
@Test
public void testMemoryReleaseGuard() throws MemoryReservationException {
Object owner = new Object();
Object owner2 = new Object();

long totalHeapMemorySize = 
memoryManager.getMemorySizeByType(MemoryType.HEAP);
memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
long heapMemoryLeft = 
memoryManager.getMemorySizeByType(MemoryType.HEAP);
assertEquals("Memory leak happens", totalHeapMemorySize - 
PAGE_SIZE, heapMemoryLeft);
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[Discussion] Job generation / submission hooks & Atlas integration

2020-02-05 Thread Gyula Fóra
Hi all!

We have started some preliminary work on the Flink - Atlas integration at
Cloudera. It seems that the integration will require some new hook
interfaces at the jobgraph generation and submission phases, so I figured I
will open a discussion thread with my initial ideas to get some early
feedback.

*Minimal background*
Very simply put Apache Atlas is a data governance framework that stores
metadata for our data and processing logic to track ownership, lineage etc.
It is already integrated with systems like HDFS, Kafka, Hive and many
others.

Adding Flink integration would mean that we can track the input output data
of our Flink jobs, their owners and how different Flink jobs are connected
to each other through the data they produce (lineage). This seems to be a
very big deal for a lot of companies :)

*Flink - Atlas integration in a nutshell*
In order to integrate with Atlas we basically need 2 things.
 - Flink entity definitions
 - Flink Atlas hook

The entity definition is the easy part. It is a json that contains the
objects (entities) that we want to store for any give Flink job. As a
starter we could have a single FlinkApplication entity that has a set of
inputs and outputs. These inputs/outputs are other Atlas entities that are
already defines such as Kafka topic or Hbase table.

The Flink atlas hook will be the logic that creates the entity instance and
uploads it to Atlas when we start a new Flink job. This is the part where
we implement the core logic.

*Job submission hook*
In order to implement the Atlas hook we need a place where we can inspect
the pipeline, create and send the metadata when the job starts. When we
create the FlinkApplication entity we need to be able to easily determine
the sources and sinks (and their properties) of the pipeline.

Unfortunately there is no JobSubmission hook in Flink that could execute
this logic and even if there was one there is a mismatch of abstraction
levels needed to implement the integration.
We could imagine a JobSubmission hook executed in the JobManager runner as
this:

void onSuccessfulSubmission(JobGraph jobGraph, Configuration configuration);

This is nice but the JobGraph makes it super difficult to extract sources
and UDFs to create the metadata entity. The atlas entity however could be
easily created from the StreamGraph object (used to represent the logical
flow) before the JobGraph is generated. To go around this limitation we
could add a JobGraphGeneratorHook interface:

void preProcess(StreamGraph streamGraph); void postProcess(JobGraph jobGraph);

We could then generate the atlas entity in the preprocess step and add a
jobmission hook in the postprocess step that will simply send the already
baked in entity.

*This kinda works but...*
The approach outlined above seems to work and we have built a POC using it.
Unfortunately it is far from nice as it exposes non-public APIs such as the
StreamGraph. Also it feels a bit weird to have 2 hooks instead of one.

It would be much nicer if we could somehow go back from JobGraph to
StreamGraph or at least have an easy way to access source/sink UDFS.

What do you think?

Cheers,
Gyula


Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-05 Thread Zhenghua Gao
+1 to remove these methods.

One concern about invocations of TableSource::getTableSchema:
By removing such methods, we can stop calling TableSource::getTableSchema
in some place(such
as BatchTableEnvImpl/TableEnvironmentImpl#validateTableSource,
ConnectorCatalogTable, TableSourceQueryOperation).

But in other place we need field types and names of the table source(such
as
BatchExecLookupJoinRule/StreamExecLookupJoinRule,
PushProjectIntoTableSourceScanRule,
CommonLookupJoin).  So how should we deal with this?

*Best Regards,*
*Zhenghua Gao*


On Wed, Feb 5, 2020 at 2:36 PM Kurt Young  wrote:

> Hi all,
>
> I'd like to bring up a discussion about removing registration of
> TableSource and
> TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
> affected
> method would be:
>
> TableEnvironment::registerTableSource
> TableEnvironment::fromTableSource
> TableEnvironment::registerTableSink
> ConnectTableDescriptor::registerTableSource
> ConnectTableDescriptor::registerTableSink
> ConnectTableDescriptor::registerTableSourceAndSink
>
> (Most of them are already deprecated, except for
> TableEnvironment::fromTableSource,
> which was intended to deprecate but missed by accident).
>
> FLIP-64 [1] already explained why we want to deprecate TableSource &
> TableSink from
> user's interface. In a short word, these interfaces should only read &
> write the physical
> representation of the table, and they are not fitting well after we already
> introduced some
> logical table fields such as computed column, watermarks.
>
> Another reason is the exposure of registerTableSource in Table Env just
> make the whole
> SQL protocol opposite. TableSource should be used as a reader of table, it
> should rely on
> other metadata information held by framework, which eventually comes from
> DDL or
> ConnectDescriptor. But if we register a TableSource to Table Env, we have
> no choice but
> have to rely on TableSource::getTableSchema. It will make the design
> obscure, sometimes
> TableSource should trust the information comes from framework, but
> sometimes it should
> also generate its own schema information.
>
> Furthermore, if the authority about schema information is not clear, it
> will make things much
> more complicated if we want to improve the table api usability such as
> introducing automatic
> schema inference in the near future.
>
> Since this is an API break change, I've also included user mailing list to
> gather more feedbacks.
>
> Best,
> Kurt
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
>


[jira] [Created] (FLINK-15918) Uptime Metric not reset on Job Restart

2020-02-05 Thread Gary Yao (Jira)
Gary Yao created FLINK-15918:


 Summary: Uptime Metric not reset on Job Restart
 Key: FLINK-15918
 URL: https://issues.apache.org/jira/browse/FLINK-15918
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.2, 1.10.0
Reporter: Gary Yao
 Fix For: 1.11.0, 1.10.1


*Description*
The {{uptime}} metric is not reset when the job restarts, which is a change in 
behavior compared to Flink 1.8.
This change of behavior exists since 1.9.0 if 
{{jobmanager.execution.failover-strategy: region}} is configured,
which we do in the default flink-conf.yaml.


*Workarounds*
Users that find this behavior problematic can set {{jobmanager.scheduler: 
legacy}} in their {{flink-conf.yaml}}


*How to reproduce*
trivial

*Expected behavior*
This is up for discussion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15917) Root Exception not shown in Web UI

2020-02-05 Thread Gary Yao (Jira)
Gary Yao created FLINK-15917:


 Summary: Root Exception not shown in Web UI
 Key: FLINK-15917
 URL: https://issues.apache.org/jira/browse/FLINK-15917
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.9.2, 1.10.0
Reporter: Gary Yao
 Fix For: 1.11.0, 1.10.1


*Description*
 On the job details page in the Exceptions → Root Exception tab, exceptions 
that cause the job to restart are not displayed.
 This is already a problem since 1.9.0 if 
{{jobmanager.execution.failover-strategy: region}} is configured,
 which we do in the default flink-conf.yaml.

*Workarounds*
 Users that run into this problem can set {{jobmanager.scheduler: legacy}} in 
their {{flink-conf.yaml}}

*How to reproduce*
 In {{flink-conf.yaml}} set {{restart-strategy: fixed-delay}} so enable job 
restarts.
{noformat}
$ bin/start-cluster.sh
$ bin/flink run -d examples/streaming/TopSpeedWindowing.jar
$ bin/taskmanager.sh stop
{noformat}
Assert that no exception is displayed in the Web UI.

*Expected behavior*
 The stacktrace of the exception should be displayed. Whether the exception 
should be also shown if only a partial region of the job failed is up for 
discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15916) Remove outdated sections for Network Buffers and Async Checkpoints from the Large State Tuning Guide

2020-02-05 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-15916:


 Summary: Remove outdated sections for Network Buffers and Async 
Checkpoints from the Large State Tuning Guide
 Key: FLINK-15916
 URL: https://issues.apache.org/jira/browse/FLINK-15916
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Stephan Ewen
Assignee: Stephan Ewen


The documentation page "Tuning Checkpoints and Large State" has a section on 
network buffers and async checkpoints.

The network buffers section is not relevant any more and say so as well (before 
Flink 1.3 ...)

The async snapshots section is also outdated (all state snapshots are async by 
default now) refers to a setup (rocks with heap timers) that is no longer the 
default and has its own warning already.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-05 Thread dianfu
Hi Jingsong,

Thanks a lot for the valuable feedback.

1. The configurations "python.fn-execution.bundle.size" and 
"python.fn-execution.arrow.batch.size" are used for separate purposes and I 
think they are both needed. If they are unified, the Python operator has to 
wait the execution results of the previous batch of elements before processing 
the next batch. This means that the Python UDF execution can not be pipelined 
between batches. With separate configuration, there will be no such problems.
2. It means that the Java operator will convert input elements to Arrow memory 
format and then send them to the Python worker, vice verse. Regarding to the 
zero-copy benefits provided by Arrow, we can gain them automatically using 
Arrow.
3. Good point! As all the classes of Python module is written in Java and it's 
not suggested to introduce new Scala classes, so I guess it's not easy to do so 
right now. But I think this is definitely a good improvement we can do in the 
future.
4. You're right and we will add a series of Arrow ColumnVectors for each type 
supported.

Thanks,
Dian

> 在 2020年2月5日,下午4:57,Jingsong Li  写道:
> 
> Hi Dian,
> 
> +1 for this, thanks driving.
> Documentation looks very good. I can imagine a huge performance improvement
> and better integration to other Python libraries.
> 
> A few thoughts:
> - About data split: "python.fn-execution.arrow.batch.size", can we unify it
> with "python.fn-execution.bundle.size"?
> - Use of Apache Arrow as the exchange format: Do you mean Arrow support
> zero-copy between Java and Python?
> - ArrowFieldWriter seems we can implement it by code generation. But it is
> OK to initial version with virtual function call.
> - ColumnarRow for vectorization reading seems that we need implement
> ArrowColumnVectors.
> 
> Best,
> Jingsong Lee
> 
> On Wed, Feb 5, 2020 at 12:45 PM dianfu  wrote:
> 
>> Hi all,
>> 
>> Scalar Python UDF has already been supported in the coming release 1.10
>> (FLIP-58[1]). It operates one row at a time. It works in the way that the
>> Java operator serializes one input row to bytes and sends them to the
>> Python worker; the Python worker deserializes the input row and evaluates
>> the Python UDF with it; the result row is serialized and sent back to the
>> Java operator.
>> 
>> It suffers from the following problems:
>> 1) High serialization/deserialization overhead
>> 2) It’s difficult to leverage the popular Python libraries used by data
>> scientists, such as Pandas, Numpy, etc which provide high performance data
>> structure and functions.
>> 
>> Jincheng and I have discussed offline and we want to introduce vectorized
>> Python UDF to address the above problems. This feature has also been
>> mentioned in the discussion thread about the Python API plan[2]. For
>> vectorized Python UDF, a batch of rows are transferred between JVM and
>> Python VM in columnar format. The batch of rows will be converted to a
>> collection of Pandas.Series and given to the vectorized Python UDF which
>> could then leverage the popular Python libraries such as Pandas, Numpy, etc
>> for the Python UDF implementation.
>> 
>> Please refer the design doc[3] for more details and welcome any feedback.
>> 
>> Regards,
>> Dian
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
>> [3]
>> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
>> 
>> 
> 
> -- 
> Best, Jingsong Lee



Re: [DISCUSS] FLIP-91 - Support SQL Client Gateway

2020-02-05 Thread godfrey he
Hi all,

I also agree with Stephan and Timo that the SQL Client should be a simple
"shell around the table environment". About "making this a standalone
project", I agree with Timo, and I think keeping SQL client in Flink
codebase can ensure SQL client integrity (has both embedded mode and
gateway mode) and out-of-the-box experience.

> 1) Can we remove the JDBC discussion from this FLIP? It can be a
separate discussion. Let's focus on the gateway first.

JDBC is only a small part in the whole discussion, and only JDBC on batch
is involved in the FLIP.  So we can create another FLIP to discuss JDBC
later.

> 2) Flink's current REST API is a custom implementation and does not rely
on any REST framework. It might make sense to not reuse code from
flink-runtime but use one of the commonly used framework. Maybe @Chesnay
in CC might have an opinion here?

I think we can create a new module named flink-rest-server as the common
framework for Flink REST API and move the common REST-related code from
flink-runtime to flink-rest-server. What do you think?  @Chesnay

> 3) The SQL Client gateway can only be a thin implementation if we also
find a long-term solution for retrieving results. Currently, all APIs
use dataStream/dataSet.collect() that might fail for larger data. We
should solve this issue first. For example, by specifying a temporary
result connector that would write to Kafka or file? Esp. if we would
like to base a JDBC interface on top of this, retrieving results must
handle big amounts of data consistently.

I agree with Timo that the gateway must handle large result data on JDBC,
but it makes little sense that the CLI client displays the large SELECT
result.
Based on the discussion of [0] and [1], I think table.collect (returns a
iterator) can meet the requirement. If that, I think SQL client do not need
the temporary result connector solution to store large data set.

> 4) We should already start discussing changes to Table API before
implementing this FLIP. This includes parts of FLIP-84 for returning
"static result tables" for statements like "SHOW MODULES" or
table.collect().

I think we can push the both FLIPs forward concurrently. Because in the
discussion of FLIP-91, we reuse the original code as much as possible, such
as Executor, SqlCommandParser. When implement FLIP-84, we just need the
change the common code, no need to change special code for gateway.


[0] https://issues.apache.org/jira/browse/FLINK-13943
[1] https://issues.apache.org/jira/browse/FLINK-14807

Best,
godfrey

Timo Walther  于2020年1月22日周三 下午9:25写道:

> Hi everyone,
>
> I agree with Stephan that the SQL Client should be a simple "shell
> around the table environment". However, I see a contradiction in the
> mentioned advantages "not limited by Flink committer reviews" and
> "quicker independent releases". If most functionality must be contained
> in the table environment, most of the development will still happen in
> the main codebase and is limited by committers. The SQL Client is
> already part of the Flink codebase. Thus, I don't see an advantage of
> moving a thin REST API to some standalone project.
>
> Fabian, Aljoscha and I also went throught the proposal we had some
> concerns that are mentioned below. In general, this is a desirable
> feature that finalizes FLIP-24.
>
> 1) Can we remove the JDBC discussion from this FLIP? It can be a
> separate discussion. Let's focus on the gateway first.
>
> 2) Flink's current REST API is a custom implementation and does not rely
> on any REST framework. It might make sense to not reuse code from
> flink-runtime but use one of the commonly used framework. Maybe @Chesnay
> in CC might have an opinion here?
>
> 3) The SQL Client gateway can only be a thin implementation if we also
> find a long-term solution for retrieving results. Currently, all APIs
> use dataStream/dataSet.collect() that might fail for larger data. We
> should solve this issue first. For example, by specifying a temporary
> result connector that would write to Kafka or file? Esp. if we would
> like to base a JDBC interface on top of this, retrieving results must
> handle big amounts of data consistently.
>
> 4) We should already start discussing changes to Table API before
> implementing this FLIP. This includes parts of FLIP-84 for returning
> "static result tables" for statements like "SHOW MODULES" or
> table.collect().
>
> What do you think?
>
> Regards,
> Timo
>
>
>
> On 22.01.20 11:44, Stephan Ewen wrote:
> > Hi all!
> >
> > I think this is a useful feature.
> >
> > Two questions about this proposal:
> >
> > (1) The SQL client tried to be a hybrid between a SQL client and a
> gateway
> > server (which blew up in complexity and never finished). Would having a
> > dedicated gateway component mean that we can simplify the client and make
> > it a simple "shell around the table environment"? I think that would be
> > good, it would make it much easier to have new Table API features
> available
> > in the SQL 

[DISCUSS] Support Python ML Pipeline API

2020-02-05 Thread Hequn Cheng
Hi everyone,

FLIP-39[1] rebuilds the Flink ML pipeline on top of TableAPI and introduces
a new set of Java APIs. As Python is widely used in ML areas, providing
Python ML Pipeline APIs for Flink can not only make it easier to write ML
jobs for Python users but also broaden the adoption of Flink ML.

Given this, Jincheng and I discussed offline about the support of Python ML
Pipeline API and drafted a design doc[2]. We'd like to achieve three goals
for supporting Python Pipeline API:
- Add Python pipeline API according to Java pipeline API(we will adapt the
Python pipeline API if Java pipeline API changes).
- Support native Python Transformer/Estimator/Model, i.e., users can write
not only Python Transformer/Estimator/Model wrappers for calling Java ones
but also can write native Python Transformer/Estimator/Models.
- Ease of use. Support keyword arguments when defining parameters.

More details can be found in the design doc and we are looking forward to
your feedback.

Best,
Hequn

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
[2]
https://docs.google.com/document/d/1fwSO5sRNWMoYuvNgfQJUV6N2n2q5UEVA4sezCljKcVQ/edit?usp=sharing


[jira] [Created] (FLINK-15915) Bump Jackson to 2.10.1 in flink-table-planner

2020-02-05 Thread Timo Walther (Jira)
Timo Walther created FLINK-15915:


 Summary: Bump Jackson to 2.10.1 in flink-table-planner
 Key: FLINK-15915
 URL: https://issues.apache.org/jira/browse/FLINK-15915
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-14104 bumped the Jackson version to 2.10.1 for most modules. The 
flink-table-planner module is still using an outdated version due to wrong 
dependency management section.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Support scalar vectorized Python UDF in PyFlink

2020-02-05 Thread Jingsong Li
Hi Dian,

+1 for this, thanks driving.
Documentation looks very good. I can imagine a huge performance improvement
and better integration to other Python libraries.

A few thoughts:
- About data split: "python.fn-execution.arrow.batch.size", can we unify it
with "python.fn-execution.bundle.size"?
- Use of Apache Arrow as the exchange format: Do you mean Arrow support
zero-copy between Java and Python?
- ArrowFieldWriter seems we can implement it by code generation. But it is
OK to initial version with virtual function call.
- ColumnarRow for vectorization reading seems that we need implement
ArrowColumnVectors.

Best,
Jingsong Lee

On Wed, Feb 5, 2020 at 12:45 PM dianfu  wrote:

> Hi all,
>
> Scalar Python UDF has already been supported in the coming release 1.10
> (FLIP-58[1]). It operates one row at a time. It works in the way that the
> Java operator serializes one input row to bytes and sends them to the
> Python worker; the Python worker deserializes the input row and evaluates
> the Python UDF with it; the result row is serialized and sent back to the
> Java operator.
>
> It suffers from the following problems:
> 1) High serialization/deserialization overhead
> 2) It’s difficult to leverage the popular Python libraries used by data
> scientists, such as Pandas, Numpy, etc which provide high performance data
> structure and functions.
>
> Jincheng and I have discussed offline and we want to introduce vectorized
> Python UDF to address the above problems. This feature has also been
> mentioned in the discussion thread about the Python API plan[2]. For
> vectorized Python UDF, a batch of rows are transferred between JVM and
> Python VM in columnar format. The batch of rows will be converted to a
> collection of Pandas.Series and given to the vectorized Python UDF which
> could then leverage the popular Python libraries such as Pandas, Numpy, etc
> for the Python UDF implementation.
>
> Please refer the design doc[3] for more details and welcome any feedback.
>
> Regards,
> Dian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-What-parts-of-the-Python-API-should-we-focus-on-next-tt36119.html
> [3]
> https://docs.google.com/document/d/1ls0mt96YV1LWPHfLOh8v7lpgh1KsCNx8B9RrW1ilnoE/edit#heading=h.qivada1u8hwd
>
>

-- 
Best, Jingsong Lee


Re: [VOTE] Release 1.10.0, release candidate #1

2020-02-05 Thread Zhijiang
Hi Thomas,

The reason of missing barrier alignment metric is found and I create the ticket 
[1] for tracing the progress. I guess it would be fixed soon. Thanks for 
reporting this.

[1] https://issues.apache.org/jira/browse/FLINK-15914

Best,
Zhijiang 
--
From:Thomas Weise 
Sent At:2020 Feb. 5 (Wed.) 14:04
To:Gary Yao ; dev 
Subject:Re: [VOTE] Release 1.10.0, release candidate #1

Hi Gary,

Thanks for the reply.

-->

On Tue, Feb 4, 2020 at 5:20 AM Gary Yao  wrote:

> Hi Thomas,
>
> > 2) Was there a change in how job recovery reflects in the uptime metric?
> > Didn't uptime previously reset to 0 on recovery (now it just keeps
> > increasing)?
>
> The uptime is the difference between the current time and the time when the
> job transitioned to RUNNING state. By default we no longer transition the
> job
> out of the RUNNING state when restarting. This has something to do with the
> new scheduler which enables pipelined region failover by default [1].
> Actually
> we enabled pipelined region failover already in the binary distribution of
> Flink 1.9 by setting:
>
> jobmanager.execution.failover-strategy: region
>
> in the default flink-conf.yaml. Unless you have removed this config option
> or
> you are using a custom yaml, you should be seeing this behavior in Flink
> 1.9.
> If you do not want region failover, set
>
> jobmanager.execution.failover-strategy: full
>
>
We are using the default (the jobmanager.execution.failover-strategy
setting is not present in our flink config).

The change in behavior I see is between the 1.9 based deployment and the
1.10 RC.

Our 1.9 branch is here: https://github.com/lyft/flink/tree/release-1.9-lyft

I also notice that the exception causing a restart is no longer displayed
in the UI, which is probably related?


>
> > 1) Is the low watermark display in the UI still broken?
>
> I was not aware that this is broken. Is there an issue tracking this bug?
>

The watermark issue was https://issues.apache.org/jira/browse/FLINK-14470

(I don't have a good way to verify it is fixed at the moment.)

Another problem with this 1.10 RC is that the checkpointAlignmentTime
metric is missing. (I have not been able to investigate this further yet.)


>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-14651
>
> On Tue, Feb 4, 2020 at 2:56 AM Thomas Weise  wrote:
>
>> I opened a PR for FLINK-15868
>> :
>> https://github.com/apache/flink/pull/11006
>>
>> With that change, I was able to run an application that consumes from
>> Kinesis.
>>
>> I should have data tomorrow regarding the performance.
>>
>> Two questions/observations:
>>
>> 1) Is the low watermark display in the UI still broken?
>> 2) Was there a change in how job recovery reflects in the uptime metric?
>> Didn't uptime previously reset to 0 on recovery (now it just keeps
>> increasing)?
>>
>> Thanks,
>> Thomas
>>
>>
>>
>>
>> On Mon, Feb 3, 2020 at 10:55 AM Thomas Weise  wrote:
>>
>> > I found another issue with the Kinesis connector:
>> >
>> > https://issues.apache.org/jira/browse/FLINK-15868
>> >
>> >
>> > On Mon, Feb 3, 2020 at 3:35 AM Gary Yao  wrote:
>> >
>> >> Hi everyone,
>> >>
>> >> I am hereby canceling the vote due to:
>> >>
>> >> FLINK-15837
>> >> FLINK-15840
>> >>
>> >> Another RC will be created later today.
>> >>
>> >> Best,
>> >> Gary
>> >>
>> >> On Mon, Jan 27, 2020 at 10:06 PM Gary Yao  wrote:
>> >>
>> >> > Hi everyone,
>> >> > Please review and vote on the release candidate #1 for the version
>> >> 1.10.0,
>> >> > as follows:
>> >> > [ ] +1, Approve the release
>> >> > [ ] -1, Do not approve the release (please provide specific comments)
>> >> >
>> >> >
>> >> > The complete staging area is available for your review, which
>> includes:
>> >> > * JIRA release notes [1],
>> >> > * the official Apache source release and binary convenience releases
>> to
>> >> be
>> >> > deployed to dist.apache.org [2], which are signed with the key with
>> >> > fingerprint BB137807CEFBE7DD2616556710B12A1F89C115E8 [3],
>> >> > * all artifacts to be deployed to the Maven Central Repository [4],
>> >> > * source code tag "release-1.10.0-rc1" [5],
>> >> >
>> >> > The announcement blog post is in the works. I will update this voting
>> >> > thread with a link to the pull request soon.
>> >> >
>> >> > The vote will be open for at least 72 hours. It is adopted by
>> majority
>> >> > approval, with at least 3 PMC affirmative votes.
>> >> >
>> >> > Thanks,
>> >> > Yu & Gary
>> >> >
>> >> > [1]
>> >> >
>> >>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
>> >> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.10.0-rc1/
>> >> > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>> >> > [4]
>> >> https://repository.apache.org/content/repositories/orgapacheflink-1325
>> >> > [5] https://github.com/apache/flink/releases/tag/release-1.10.0-rc1
>> 

[jira] [Created] (FLINK-15914) Miss the barrier alignment metric for the case of two inputs

2020-02-05 Thread zhijiang (Jira)
zhijiang created FLINK-15914:


 Summary: Miss the barrier alignment metric for the case of two 
inputs
 Key: FLINK-15914
 URL: https://issues.apache.org/jira/browse/FLINK-15914
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Metrics
Reporter: zhijiang
Assignee: zhijiang


When the StreamTwoInputSelectableProcessor was introduced before, it was 
missing to add the barrier alignment metric in the constructor. But it does not 
cause problems then, because only StreamTwoInputProcessor works at that time.

After StreamTwoInputProcessor is replaced by StreamTwoInputSelectableProcessor 
as now, this bug is exposed and we will not see the barrier alignment metric 
for the case of two inputs.

The solution is to add this metric while constructing the current 
StreamTwoInputProcessor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-05 Thread Jingsong Li
HI Kurt,

+1 to remove these methods.

But one concern is that some of the current TableSource/TableSink may not
be ready, such as the JDBCUpsertTableSink, which accepts a JDBCDialect, but
through the TableFactory, there is no way to pass in the JDBCDialect at
present. But I also believe we have enough time on 1.11 to prepare them.
Then unified to TableFactory.

I think there may be complaints from users because they used to only
implement a TableSource or TableSink, but now they have to implement
TableFactory. But I think it's also good for conceptual clarity to force
them to implement TableFactory.

Another idea is can we provide some utils to help user implement
TableFactory? The current method may be a little bit complex, and it needs
to be added to
the "META_INF/services/org.apache.flink.table.factories.TableFactory" file.

Best,
Jingsong Lee

On Wed, Feb 5, 2020 at 3:58 PM Dawid Wysakowicz 
wrote:

> Hi Kurt,
>
> I fully agree with the proposal. Yes it was an omission that we did not
> deprecate the TableEnvironment#fromTableSource in the previous version.
>
> I would vote to remove all those methods altogether.
>
> Best,
>
> Dawid
>
> On 05/02/2020 07:36, Kurt Young wrote:
> > Hi all,
> >
> > I'd like to bring up a discussion about removing registration of
> > TableSource and
> > TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
> > affected
> > method would be:
> >
> > TableEnvironment::registerTableSource
> > TableEnvironment::fromTableSource
> > TableEnvironment::registerTableSink
> > ConnectTableDescriptor::registerTableSource
> > ConnectTableDescriptor::registerTableSink
> > ConnectTableDescriptor::registerTableSourceAndSink
> >
> > (Most of them are already deprecated, except for
> > TableEnvironment::fromTableSource,
> > which was intended to deprecate but missed by accident).
> >
> > FLIP-64 [1] already explained why we want to deprecate TableSource &
> > TableSink from
> > user's interface. In a short word, these interfaces should only read &
> > write the physical
> > representation of the table, and they are not fitting well after we
> already
> > introduced some
> > logical table fields such as computed column, watermarks.
> >
> > Another reason is the exposure of registerTableSource in Table Env just
> > make the whole
> > SQL protocol opposite. TableSource should be used as a reader of table,
> it
> > should rely on
> > other metadata information held by framework, which eventually comes from
> > DDL or
> > ConnectDescriptor. But if we register a TableSource to Table Env, we have
> > no choice but
> > have to rely on TableSource::getTableSchema. It will make the design
> > obscure, sometimes
> > TableSource should trust the information comes from framework, but
> > sometimes it should
> > also generate its own schema information.
> >
> > Furthermore, if the authority about schema information is not clear, it
> > will make things much
> > more complicated if we want to improve the table api usability such as
> > introducing automatic
> > schema inference in the near future.
> >
> > Since this is an API break change, I've also included user mailing list
> to
> > gather more feedbacks.
> >
> > Best,
> > Kurt
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
> >
>
>

-- 
Best, Jingsong Lee


Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-05 Thread Danny Chan
Thanks for driving this Kurt.

Because we already have DDL and Descriptor as an alternative of these 
deprecated methods, removing them would reduce ambiguity and make the near 
future work more easier.

As we discussed offline, although some of the connectors may still have 
attributes that cannot be specified though properties, removing them force us 
to re-think the TableFactory/properties interface.

Best,
Danny Chan
在 2020年2月5日 +0800 PM3:58,Dawid Wysakowicz ,写道:
> Hi Kurt,
>
> I fully agree with the proposal. Yes it was an omission that we did not
> deprecate the TableEnvironment#fromTableSource in the previous version.
>
> I would vote to remove all those methods altogether.
>
> Best,
>
> Dawid
>
> On 05/02/2020 07:36, Kurt Young wrote:
> > Hi all,
> >
> > I'd like to bring up a discussion about removing registration of
> > TableSource and
> > TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
> > affected
> > method would be:
> >
> > TableEnvironment::registerTableSource
> > TableEnvironment::fromTableSource
> > TableEnvironment::registerTableSink
> > ConnectTableDescriptor::registerTableSource
> > ConnectTableDescriptor::registerTableSink
> > ConnectTableDescriptor::registerTableSourceAndSink
> >
> > (Most of them are already deprecated, except for
> > TableEnvironment::fromTableSource,
> > which was intended to deprecate but missed by accident).
> >
> > FLIP-64 [1] already explained why we want to deprecate TableSource &
> > TableSink from
> > user's interface. In a short word, these interfaces should only read &
> > write the physical
> > representation of the table, and they are not fitting well after we already
> > introduced some
> > logical table fields such as computed column, watermarks.
> >
> > Another reason is the exposure of registerTableSource in Table Env just
> > make the whole
> > SQL protocol opposite. TableSource should be used as a reader of table, it
> > should rely on
> > other metadata information held by framework, which eventually comes from
> > DDL or
> > ConnectDescriptor. But if we register a TableSource to Table Env, we have
> > no choice but
> > have to rely on TableSource::getTableSchema. It will make the design
> > obscure, sometimes
> > TableSource should trust the information comes from framework, but
> > sometimes it should
> > also generate its own schema information.
> >
> > Furthermore, if the authority about schema information is not clear, it
> > will make things much
> > more complicated if we want to improve the table api usability such as
> > introducing automatic
> > schema inference in the near future.
> >
> > Since this is an API break change, I've also included user mailing list to
> > gather more feedbacks.
> >
> > Best,
> > Kurt
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
> >
>


Re: [DISCUSS] Improve TableFactory

2020-02-05 Thread Rui Li
+1, thanks for the efforts.

On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li  wrote:

> Hi all,
>
> As Jark suggested in VOTE thread.
> JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li 
> wrote:
>
> > Hi Timo,
> >
> > G ood catch!
> >
> > I really love the idea 2, a full Flink config looks very good to me.
> >
> > Try to understand your first one, actually we don't have
> `TableIdentifier`
> > class now. But TableFactory already indicate table. So I am OK.
> >
> > New Context should be:
> >
> >/**
> > * Context of table source creation. Contains table information and
> environment information.
> > */
> >interface Context {
> >   /**
> >* @return full identifier of the given {@link CatalogTable}.
> >*/
> >   ObjectIdentifier getObjectIdentifier();
> >   /**
> >* @return table {@link CatalogTable} instance.
> >*/
> >   CatalogTable getTable();
> >   /**
> >* @return readable config of this table environment.
> >*/
> >   ReadableConfig getConfiguration();
> >}
> >
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther  wrote:
> >
> >> Hi Jingsong,
> >>
> >> some last minute changes from my side:
> >>
> >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
> >> obvious. Otherwise people expect a `TableIdentifier` class being
> >> returned here.
> >>
> >> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> >> will not only be a "table" config but might give access to the full
> >> Flink config
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >> On 04.02.20 06:27, Jingsong Li wrote:
> >> > So the interface will be:
> >> >
> >> > public interface TableSourceFactory extends TableFactory {
> >> > ..
> >> >
> >> > /**
> >> >  * Creates and configures a {@link TableSource} based on the given
> >> > {@link Context}.
> >> >  *
> >> >  * @param context context of this table source.
> >> >  * @return the configured table source.
> >> >  */
> >> > default TableSource createTableSource(Context context) {
> >> >ObjectIdentifier tableIdentifier =
> context.getTableIdentifier();
> >> >return createTableSource(
> >> >  new ObjectPath(tableIdentifier.getDatabaseName(),
> >> > tableIdentifier.getObjectName()),
> >> >  context.getTable());
> >> > }
> >> > /**
> >> >  * Context of table source creation. Contains table information
> and
> >> > environment information.
> >> >  */
> >> > interface Context {
> >> >/**
> >> > * @return full identifier of the given {@link CatalogTable}.
> >> > */
> >> >ObjectIdentifier getTableIdentifier();
> >> >/**
> >> > * @return table {@link CatalogTable} instance.
> >> > */
> >> >CatalogTable getTable();
> >> >/**
> >> > * @return readable config of this table environment.
> >> > */
> >> >ReadableConfig getTableConfig();
> >> > }
> >> > }
> >> >
> >> > public interface TableSinkFactory extends TableFactory {
> >> > ..
> >> > /**
> >> >  * Creates and configures a {@link TableSink} based on the given
> >> > {@link Context}.
> >> >  *
> >> >  * @param context context of this table sink.
> >> >  * @return the configured table sink.
> >> >  */
> >> > default TableSink createTableSink(Context context) {
> >> >ObjectIdentifier tableIdentifier =
> context.getTableIdentifier();
> >> >return createTableSink(
> >> >  new ObjectPath(tableIdentifier.getDatabaseName(),
> >> > tableIdentifier.getObjectName()),
> >> >  context.getTable());
> >> > }
> >> > /**
> >> >  * Context of table sink creation. Contains table information and
> >> > environment information.
> >> >  */
> >> > interface Context {
> >> >/**
> >> > * @return full identifier of the given {@link CatalogTable}.
> >> > */
> >> >ObjectIdentifier getTableIdentifier();
> >> >/**
> >> > * @return table {@link CatalogTable} instance.
> >> > */
> >> >CatalogTable getTable();
> >> >/**
> >> > * @return readable config of this table environment.
> >> > */
> >> >ReadableConfig getTableConfig();
> >> > }
> >> > }
> >> >
> >> >
> >> > Best,
> >> > Jingsong Lee
> >> >
> >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li 
> >> wrote:
> >> >
> >> >> Hi all,
> >> >>
> >> >> After rethinking and discussion with Kurt, I'd like to remove
> >> "isBounded".
> >> >> We can delay this is bounded message to TableSink.
> >> >> With TableSink refactor, we need consider "consumeDataStream"
> >> >> and "consumeBoundedStream".
> >> >>
> >> >> Best,
> >> >> Jingsong Lee
> >> >>
> >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li 
> >> wrote:
> >> >>
> >> >>> Hi 

Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #0

2020-02-05 Thread jincheng sun
Hi Wei,

Thanks for your vote and I appreciate that you kindly help to take the
ticket.

 I've assigned the JIRAs to you!

Best,
Jincheng


Wei Zhong  于2020年2月5日周三 下午3:55写道:

> Hi,
>
> Thanks for driving this, Jincheng.
>
> +1 (non-binding)
>
> - Verified signatures and checksums.
> - `pip install apache-flink-1.9.2.tar.gz` successfully.
> - Start local pyflink shell via `pyflink-shell.sh local` and try the
> examples in the help message, run well and no exception.
> - Try a word count example in IDE, run well and no exception.
>
> In addition I'm willing to take these JIRAs. Could you assign them to me?
> :)
>
> Best,
> Wei
>
>
> > 在 2020年2月5日,14:49,jincheng sun  写道:
> >
> > Hi everyone,
> >
> > Please review and vote on the release candidate #0 for the PyFlink
> version
> > 1.9.2, as follows:
> >
> > [ ] +1, Approve the release
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> > The complete staging area is available for your review, which includes:
> > * the official Apache source release and binary convenience releases to
> be
> > deployed to dist.apache.org [1], which are signed with the key with
> > fingerprint 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2],
> > * source code tag "release-1.9.2" [3],
> > * create JIRA. for add description of support 'pip install' to 1.9.x
> > documents[4]
> > * create JIRA. for add PyPI release process for subsequent version
> release
> > of 1.9.x . i.e. improve the script of `create-binary-release. sh`.[5]
> >
> > The vote will be open for at least 72 hours. It is adopted by majority
> > approval, with at least 3 PMC affirmative votes.
> >
> > Thanks,
> > Jincheng
> >
> > [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc0/
> > [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> > [3] https://github.com/apache/flink/tree/release-1.9.2
> > [4] https://issues.apache.org/jira/browse/FLINK-15908
> > [5] https://issues.apache.org/jira/browse/FLINK-15909
>
>


[jira] [Created] (FLINK-15913) Add Python Table Function Runner And Operator

2020-02-05 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-15913:


 Summary: Add Python Table Function Runner And Operator
 Key: FLINK-15913
 URL: https://issues.apache.org/jira/browse/FLINK-15913
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.11.0


Add Python Table Function Runner and Operator



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Improve TableFactory

2020-02-05 Thread Jingsong Li
Hi all,

As Jark suggested in VOTE thread.
JIRA created: https://issues.apache.org/jira/browse/FLINK-15912

Best,
Jingsong Lee

On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li  wrote:

> Hi Timo,
>
> G ood catch!
>
> I really love the idea 2, a full Flink config looks very good to me.
>
> Try to understand your first one, actually we don't have `TableIdentifier`
> class now. But TableFactory already indicate table. So I am OK.
>
> New Context should be:
>
>/**
> * Context of table source creation. Contains table information and 
> environment information.
> */
>interface Context {
>   /**
>* @return full identifier of the given {@link CatalogTable}.
>*/
>   ObjectIdentifier getObjectIdentifier();
>   /**
>* @return table {@link CatalogTable} instance.
>*/
>   CatalogTable getTable();
>   /**
>* @return readable config of this table environment.
>*/
>   ReadableConfig getConfiguration();
>}
>
>
> Best,
> Jingsong Lee
>
> On Tue, Feb 4, 2020 at 8:51 PM Timo Walther  wrote:
>
>> Hi Jingsong,
>>
>> some last minute changes from my side:
>>
>> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
>> obvious. Otherwise people expect a `TableIdentifier` class being
>> returned here.
>>
>> 2. rename `getTableConfig` to `getConfiguration()` in the future this
>> will not only be a "table" config but might give access to the full
>> Flink config
>>
>> Thanks,
>> Timo
>>
>>
>> On 04.02.20 06:27, Jingsong Li wrote:
>> > So the interface will be:
>> >
>> > public interface TableSourceFactory extends TableFactory {
>> > ..
>> >
>> > /**
>> >  * Creates and configures a {@link TableSource} based on the given
>> > {@link Context}.
>> >  *
>> >  * @param context context of this table source.
>> >  * @return the configured table source.
>> >  */
>> > default TableSource createTableSource(Context context) {
>> >ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>> >return createTableSource(
>> >  new ObjectPath(tableIdentifier.getDatabaseName(),
>> > tableIdentifier.getObjectName()),
>> >  context.getTable());
>> > }
>> > /**
>> >  * Context of table source creation. Contains table information and
>> > environment information.
>> >  */
>> > interface Context {
>> >/**
>> > * @return full identifier of the given {@link CatalogTable}.
>> > */
>> >ObjectIdentifier getTableIdentifier();
>> >/**
>> > * @return table {@link CatalogTable} instance.
>> > */
>> >CatalogTable getTable();
>> >/**
>> > * @return readable config of this table environment.
>> > */
>> >ReadableConfig getTableConfig();
>> > }
>> > }
>> >
>> > public interface TableSinkFactory extends TableFactory {
>> > ..
>> > /**
>> >  * Creates and configures a {@link TableSink} based on the given
>> > {@link Context}.
>> >  *
>> >  * @param context context of this table sink.
>> >  * @return the configured table sink.
>> >  */
>> > default TableSink createTableSink(Context context) {
>> >ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>> >return createTableSink(
>> >  new ObjectPath(tableIdentifier.getDatabaseName(),
>> > tableIdentifier.getObjectName()),
>> >  context.getTable());
>> > }
>> > /**
>> >  * Context of table sink creation. Contains table information and
>> > environment information.
>> >  */
>> > interface Context {
>> >/**
>> > * @return full identifier of the given {@link CatalogTable}.
>> > */
>> >ObjectIdentifier getTableIdentifier();
>> >/**
>> > * @return table {@link CatalogTable} instance.
>> > */
>> >CatalogTable getTable();
>> >/**
>> > * @return readable config of this table environment.
>> > */
>> >ReadableConfig getTableConfig();
>> > }
>> > }
>> >
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li 
>> wrote:
>> >
>> >> Hi all,
>> >>
>> >> After rethinking and discussion with Kurt, I'd like to remove
>> "isBounded".
>> >> We can delay this is bounded message to TableSink.
>> >> With TableSink refactor, we need consider "consumeDataStream"
>> >> and "consumeBoundedStream".
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li 
>> wrote:
>> >>
>> >>> Hi Jark,
>> >>>
>> >>> Thanks involving, yes, it's hard to understand to add isBounded on the
>> >>> source.
>> >>> I recommend adding only to sink at present, because sink has upstream.
>> >>> Its upstream is either bounded or unbounded.
>> >>>
>> >>> Hi all,
>> >>>
>> >>> Let me summarize with your suggestions.
>> >>>
>> >>> public interface TableSourceFactory extends TableFactory {
>> >>>
>> >>>