Re: [Discussion] FLIP-79 Flink Function DDL Support

2019-10-31 Thread Peter Huang
Hi Terry,

Thanks for the quick response. We are on the same page. For the
properties of function DDL, let's see whether there is such a need from
other people.
I will start voting on the design in 24 hours.


Best Regards
Peter Huang







On Thu, Oct 31, 2019 at 3:18 AM Terry Wang  wrote:

> Hi Peter,
>
> I’d like to share some thoughts from mysids:
> 1. what's the syntax to distinguish function language ?
> +1 for using `[LANGUAGE JVM|PYTHON] USING JAR`
> 2. How to persist function language in backend catalog ?
> + 1 for a separate field in CatalogFunction. But as to specific
> backend, we may persist it case by case. Special case includes how
> HiveCatalog store the kind of CatalogFucnction.
> 3. do we really need to allow users set a properties map for a udf?
> There are use case requiring passing external arguments to udf for
> sure, but the need can also be met by passing arguments to `eval` when
> calling udf in sql.
> IMO, there is not much need to support set properties map for a udf.
>
> 4. Should a catalog implement to be able to decide whether it can take a
> properties map, and which language of a udf it can persist?
> IMO, it’s necessary for catalog implementation to provide such
> information. But for flink 1.10 map goal, we can just skip this part.
>
>
>
> Best,
> Terry Wang
>
>
>
> > 2019年10月30日 13:52,Peter Huang  写道:
> >
> > Hi Bowen,
> >
> > I can't agree more about we first have an agreement on the DDL syntax and
> > focus on the MVP in the current phase.
> >
> > 1) what's the syntax to distinguish function language
> > Currently, there are two opinions:
> >
> >   - USING 'python .'
> >   - [LANGUAGE JVM|PYTHON] USING JAR '...'
> >
> > As we need to support multiple resources as HQL, we shouldn't repeat the
> > language symbol as a suffix of each resource.
> > I would prefer option two, but definitely open to more comments.
> >
> > 2) How to persist function language in backend catalog? as a k-v pair in
> > properties map, or a dedicate field?
> > Even though language type is also a property, I think a separate field in
> > CatalogFunction is a more clean solution.
> >
> > 3) do we really need to allow users set a properties map for udf? what
> needs
> > to be stored there? what are they used for?
> >
> > I am considering a type of use case that use UDFS for realtime inference.
> > The model is nested in the udf as a resource. But there are
> > multiple parameters are customizable. In this way, user can use
> properties
> > to define those parameters.
> >
> > I only have answers to these questions. For questions about the catalog
> > implementation, I hope we can collect more feedback from the community.
> >
> >
> > Best Regards
> > Peter Huang
> >
> >
> >
> >
> >
> > Best Regards
> > Peter Huang
> >
> > On Tue, Oct 29, 2019 at 11:31 AM Bowen Li  wrote:
> >
> >> Hi all,
> >>
> >> Besides all the good questions raised above, we seem all agree to have a
> >> MVP for Flink 1.10, "to support users to create and persist a java
> >> class-based udf that's already in classpath (no extra resource loading),
> >> and use it later in queries".
> >>
> >> IIUIC, to achieve that in 1.10, the following are currently the core
> >> issues/blockers we should figure out, and solve them as our **highest
> >> priority**:
> >>
> >> - what's the syntax to distinguish function language (java, scala,
> python,
> >> etc)? we only need to implement the java one in 1.10 but have to settle
> >> down the long term solution
> >> - how to persist function language in backend catalog? as a k-v pair in
> >> properties map, or a dedicate field?
> >> - do we really need to allow users set a properties map for udf? what
> needs
> >> to be stored there? what are they used for?
> >> - should a catalog impl be able to decide whether it can take a
> properties
> >> map (if we decide to have one), and which language of a udf it can
> persist?
> >>   - E.g. Hive metastore, which backs Flink's HiveCatalog, cannot take a
> >> properties map and is only able to persist java udf [1], unless we do
> >> something hacky to it
> >>
> >> I feel these questions are essential to Flink functions in the long run,
> >> but most importantly, are also the minimum scope for Flink 1.10. Aspects
> >> like resource loading security or compatibility with Hive syntax are
> >> important too, however if we focus on them now, we may not be able to
> get
> >> the MVP out in time.
> >>
> >> [1]
> >> -
> >>
> >>
> https://hive.apache.org/javadocs/r3.1.2/api/org/apache/hadoop/hive/metastore/api/Function.html
> >> -
> >>
> >>
> https://hive.apache.org/javadocs/r3.1.2/api/org/apache/hadoop/hive/metastore/api/FunctionType.html
> >>
> >>
> >>
> >> On Sun, Oct 27, 2019 at 8:22 PM Peter Huang  >
> >> wrote:
> >>
> >>> Hi Timo,
> >>>
> >>> Thanks for the feedback. I replied and adjust the design accordingly.
> For
> >>> the concern of class loading.
> >>> I think we need to distinguish the function class loading for Temporary
> >> and
> 

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Peter Huang
Hi Tison and Community,

Thanks for bringing it up. Actually, we meet a similar bottleneck of using
per cluster mode. Our team built a service for deploying and operating
Flink jobs.
The service sits in front of yarn clusters. To submit different job jars,
we need to download client jar into the service and generate a job
graph which is time-consuming.
Thus, we find an idea of Delayed Job Graph to make the job graph generation
in ClusterEntryPoint rather than on the client-side. Compare to your
proposal, it is more lightweight,
 and it is an option for existing per job mode. But it is not a solution
for handling multiple job graph within a program.

I am looking forward to more comments on the proposal, and also definitely
cooperation on this effort.
I hope both of our pain points can be resolved and contribute back to the
community.


https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t


Best Regards
Peter Huang

















On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy  wrote:

> Hi all,
>
>
> Firstly thanks @tison for bring this up and strongly +1 for the overall
> design.
>
>
> I’d like to add one more example of "multiple jobs in one program" with
> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing
> (including tens of sql query job) on Flink and sufferring a lot from
> maintaining the client because I can’t run this program in per-job mode and
> have to make the client attached.
>
>
> Back to our discussion, I can see now there is a divergence of compiling
> the job graph between in client and in #ClusterEntrypoint. And up and
> downsides exist in either way. As for the opt-in solution, I have a
> question, what if the user chooses detach mode, compiling in the client and
> runs a multi-job program at the same time? And it still not gonna work.
>
> Besides, by adding an compiling option, we need to consider more things
> when submitting a job like "Is my program including multiple job?" or "Does
> the program need to be initialized before submitting to a remote cluster?",
> which looks a bit complicated and confusing to me.
>
>
> By summarizing, I'll vote for the per-program new concept but I may not
> prefer the opt-in option mentioned in the mailing list or maybe we need to
> reconsider a better concept and definition which is easy to understand.
>
>
>
> Best,
>
> Jiayi Liao
>
>  Original Message
> *Sender:* Rong Rong
> *Recipient:* Regina" 
> *Cc:* Theo Diefenthal;
> u...@flink.apache.org
> *Date:* Friday, Nov 1, 2019 11:01
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
> Hi All,
>
> Thanks @Tison for starting the discussion and I think we have very similar
> scenario with Theo's use cases.
> In our case we also generates the job graph using a client service (which
> serves multiple job graph generation from multiple user code) and we've
> found that managing the upload/download between the cluster and the DFS to
> be trick and error-prone. In addition, the management of different
> environment and requirement from different user in a single service posts
> even more trouble for us.
>
> However, shifting the job graph generation towards the cluster side also
> requires some thoughts regarding how to manage the driver-job as well as
> some dependencies conflicts - In the case for shipping the job graph
> generation to the cluster, some unnecessary dependencies for the runtime
> will be pulled in by the driver-job (correct me if I were wrong Theo)
>
> I think in general I agree with @Gyula's main point: unless there is a
> very strong reason, it is better if we put the driver-mode as an opt-in (at
> least at the beginning).
> I left some comments on the document as well. Please kindly take a look.
>
> Thanks,
> Rong
>
> On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:
>
>> Yeah just chiming in this conversation as well. We heavily use multiple
>> job graphs to get isolation around retry logic and resource allocation
>> across the job graphs. Putting all these parallel flows into a single graph
>> would mean sharing of TaskManagers across what was meant to be truly
>> independent.
>>
>>
>>
>> We also build our job graphs dynamically based off of the state of the
>> world at the start of the job. While we’ve had a share of the pain
>> described, my understanding is that there would be a tradeoff in number of
>> jobs being submitted to the cluster and corresponding resource allocation
>> requests. In the model with multiple jobs in a program, there’s at least
>> the opportunity to reuse idle taskmanagers.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Theo Diefenthal 
>> *Sent:* Thursday, October 31, 2019 10:56 AM
>> *To:* u...@flink.apache.org
>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>
>>
>>
>> I agree with Gyula Fora,
>>
>>
>>
>> In our case, we have a client-machine in the middle between our YARN
>> cluster and some backend services, which can not be 

[jira] [Created] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

2019-10-31 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-14591:
-

 Summary:  Execute PlannerBase#mergeParameters every time of 
calling PlannerBase#translate method
 Key: FLINK-14591
 URL: https://issues.apache.org/jira/browse/FLINK-14591
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Wei Zhong


In current implementation of blink planner, the method 
"PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method 
to merge the configuration inside TableConfig into global job parameters:
{code:scala}
  override def translate(
  modifyOperations: util.List[ModifyOperation]): 
util.List[Transformation[_]] = {
if (modifyOperations.isEmpty) {
  return List.empty[Transformation[_]]
}
mergeParameters()
val relNodes = modifyOperations.map(translateToRel)
val optimizedRelNodes = optimize(relNodes)
val execNodes = translateToExecNodePlan(optimizedRelNodes)
translateToPlan(execNodes)
  }
{code}
This translate method is called in every important moment, e.g. execute, 
toDataStream, insertInto, etc.

But as shown above, there is a chance that the method return directly and not 
call the "mergeParameters".

In fact if we set some configurations between the "Table#insertInto" method and 
"TableEnvironment#execute" method, these configurations will not be merged into 
global job parameters because the "mergeParameters" method is not called:
{code:scala}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.newInstance.useBlinkPlanner.build)
...
...
val result = ...
val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
tEnv.registerTableSink("MySink", sink)
tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
result.insertInto("MySink")

// the "jobparam2" configuration will loss
tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
tEnv.execute("test")
val jobConfig = env.getConfig.getGlobalJobParameters.toMap

assertTrue(jobConfig.get("jobparam1")=="value1")
// this assertion will fail:
assertTrue(jobConfig.get("jobparam2")=="value2"){code}
This may bring some confusion to the user. It will be great if we can fix this 
problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: FW: Unit testing for ProcessAllWindowFunction

2019-10-31 Thread vino yang
Hi Diogo,

In order to test ProcessAllWindowFunction, you need to do a little more.

You can refer to Flink's own test code.[1]

In short:

1) Write a Flink Streaming job that uses your UDF (here is
ProcessAllWindowFunction) and return a DataStream.
2) Get OneInputTransformation through DataStream, and then get
OneInputStreamOperator;
3) Convert OneInputStreamOperator to WindowOperator;
4) Use test harness, please refer here[2]

Best,
Vino

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java#L732
[2]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java#L1405

Diogo Araújo  于2019年11月1日周五 上午1:31写道:

> Good afternoon,
>
>
>
> After Reading the official flink testing documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html
> )
>
> I was able to develop tests for a ProcessFunction, using a Test Harness,
> something like this:
>
>
>
> *pendingPartitionBuilder *= new PendingPartitionBuilder(":::some_name", ""
> )
>
> *testHarness *=
>   new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData,
> PendingPartition](
> new ProcessOperator[StaticAdequacyTilePublishedData,PendingPartition](
> *pendingPartitionBuilder*)
>   )
>
> *testHarness*.open()
>
>
>
>
> now, I’m trying to do the same for a ProcessAllWindowFunction.
>
> First I realized I can’t use TestHarness for  ProcessAllWindowFunction,
> because it doesn’t have a processElement method. In this case, what unit
> test strategy should I follow?
>
>
>
>
>
> *Diogo Araújo* | Rockstar Developer
> diogo.ara...@criticaltechworks.com
> +351 912882824
> [image: Critical TechWorks]
> Rua do Campo Alegre, nº 17, piso 0 | 4150-177 Porto
> www.criticaltechworks.com
> [image: Critical TechWorks @ Instagram]
> [image: Critical TechWorks
> @ LinkedIn]  [image:
> Critical TechWorks @ Twitter]  [image:
> Critical TechWorks @ Facebook]
> 
>
>
>
>
>
> *From: *Diogo Araújo 
> *Date: *Thursday, 31 October 2019 at 16:55
> *To: *"u...@flink.apache.org" 
> *Subject: *Unit testing for ProcessAllWindowFunction
>
>
>
> Good afternoon,
>
>
>
> After Reading the official flink testing documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html
> )
>
> I was able to develop tests for a ProcessFunction, using a Test Harness,
> something like this:
>
>
>
> *pendingPartitionBuilder *= new PendingPartitionBuilder(":::some_name", ""
> )
>
> *testHarness *=
>   new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData,
> PendingPartition](
> new ProcessOperator[StaticAdequacyTilePublishedData,
> PendingPartition](*pendingPartitionBuilder*)
>   )
>
> *testHarness*.open()
>
>
>
>
> now, I’m trying to do the same for a ProcessAllWindowFunction.
>
> First I realized I can’t use TestHarness for  ProcessAllWindowFunction,
> because it doesn’t have a processElement method. In this case, what unit
> test strategy should I follow?
>
> *Diogo Araújo* | Rockstar Developer
> diogo.ara...@criticaltechworks.com
> +351 912882824
> [image: Critical TechWorks]
> Rua do Campo Alegre, nº 17, piso 0 | 4150-177 Porto
> www.criticaltechworks.com
> [image: Critical TechWorks @ Instagram]
> [image: Critical TechWorks
> @ LinkedIn]  [image:
> Critical TechWorks @ Twitter]  [image:
> Critical TechWorks @ Facebook]
> 
>
>
>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread bupt_ljy
Hi all,


Firstly thanks @tison for bring this up and strongly +1 for the overall design. 


I’d like to add one more example of "multiple jobs in one program" with what 
I’m currently working on. I’m trying to run a TPC-DS benchmark testing 
(including tens of sql query job) on Flink and sufferring a lot from 
maintaining the client because I can’t run this program in per-job mode and 
have to make the client attached. 


Back to our discussion, I can see now there is a divergence of compiling the 
job graph between in client and in #ClusterEntrypoint. And up and downsides 
exist in either way. As for the opt-in solution, I have a question, what if the 
user chooses detach mode, compiling in the client and runs a multi-job program 
at the same time? And it still not gonna work.
Besides, by adding an compiling option, we need to consider more things when 
submitting a job like "Is my program including multiple job?" or "Does the 
program need to be initialized before submitting to a remote cluster?", which 
looks a bit complicated and confusing to me.


By summarizing, I'll vote for the per-program new concept but I may not prefer 
the opt-in option mentioned in the mailing list or maybe we need to reconsider 
a better concept and definition which is easy to understand.




Best,
Jiayi Liao


 Original Message 
Sender: Rong Rong
Recipient: Regina" 
Cc: Theo Diefenthal; 
u...@flink.apache.org
Date: Friday, Nov 1, 2019 11:01
Subject: Re: [DISCUSS] Semantic and implementation of per-job mode


Hi All,


Thanks @Tison for starting the discussion and I think we have very similar 
scenario with Theo's use cases. 
In our case we also generates the job graph using a client service (which 
serves multiple job graph generation from multiple user code) and we've found 
that managing the upload/download between the cluster and the DFS to be trick 
and error-prone. In addition, the management of different environment and 
requirement from different user in a single service posts even more trouble for 
us.


However, shifting the job graph generation towards the cluster side also 
requires some thoughts regarding how to manage the driver-job as well as some 
dependencies conflicts - In the case for shipping the job graph generation to 
the cluster, some unnecessary dependencies for the runtime will be pulled in by 
the driver-job (correct me if I were wrong Theo)



I think in general I agree with @Gyula's main point: unless there is a very 
strong reason, it is better if we put the driver-mode as an opt-in (at least at 
the beginning). 

I left some comments on the document as well. Please kindly take a look.


Thanks,
Rong


On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:

Yeah just chiming in this conversation as well. We heavily use multiple job 
graphs to get isolation around retry logic and resource allocation across the 
job graphs. Putting all these parallel flows into a single graph would mean 
sharing of TaskManagers across what was meant to be truly independent.
 
We also build our job graphs dynamically based off of the state of the world at 
the start of the job. While we’ve had a share of the pain described, my 
understanding is that there would be a tradeoff in number of jobs being 
submitted to the cluster and corresponding resource allocation requests. In the 
model with multiple jobs in a program, there’s at least the opportunity to 
reuse idle taskmanagers. 
 
 
 
 
From: Theo Diefenthal  
 Sent: Thursday, October 31, 2019 10:56 AM
 To: u...@flink.apache.org
 Subject: Re: [DISCUSS] Semantic and implementation of per-job mode
 
I agree with Gyula Fora,
 
In our case, we have a client-machine in the middle between our YARN cluster 
and some backend services, which can not be reached directly from the cluster 
nodes. On application startup, we connect to some external systems, get some 
information crucial for the job runtime and finally build up the job graph to 
be committed.
 
It is true that we could workaround this, but it would be pretty annoying to 
connect to the remote services, collect the data, upload it to HDFS, start the 
job and make sure, housekeeping of those files is also done at some later time. 
 
The current behavior also corresponds to the behavior of Sparks driver mode, 
which made the transition from Spark to Flink easier for us. 
 
But I see the point, especially in terms of Kubernetes and would thus also vote 
for an opt-in solution, being the client compilation the default and having an 
option for the per-program mode as well.
 
Best regards
 
Von: "Flavio Pompermaier" 
 An: "Yang Wang" 
 CC: "tison" , "Newport, Billy" , 
"Paul Lam" , "SHI Xiaogang" , 
"dev" , "user" 
 Gesendet: Donnerstag, 31. Oktober 2019 10:45:36
 Betreff: Re: [DISCUSS] Semantic and implementation of per-job mode
 
Hi all, 
we're using a lot the multiple jobs in one program and this is why: when you 
fetch data from a huge number of sources and, for each source, you do some 
transformation and then you 

[jira] [Created] (FLINK-14590) Unify the working directory of Java process and Python process when submitting python jobs via "flink run -py"

2019-10-31 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-14590:
-

 Summary: Unify the working directory of Java process and Python 
process when submitting python jobs via "flink run -py"
 Key: FLINK-14590
 URL: https://issues.apache.org/jira/browse/FLINK-14590
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Wei Zhong


Assume we enter this flink directory with following structure:
{code:java}
flink/
  bin/
  flink
  pyflink-shell.sh
  python-gateway-server.sh
  ...
  bad_case/
   word_count.py
   data.txt
  lib/...
  opt/...{code}
 And the word_count.py has such a piece of code:
{code:java}
t_config = TableConfig()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env, t_config)

env._j_stream_execution_environment.registerCachedFile("data", 
"bad_case/data.txt")

with open("bad_case/data.txt", "r") as f:
content = f.read()
elements = [(word, 1) for word in content.split(" ")]
t_env.from_elements(elements, ["word", "count"]){code}
Then we enter the "flink" directory and run:
{code:java}
bin/flink run -py bad_case/word_count.py
{code}
The program will fail at the line of "with open("bad_case/data.txt", "r") as 
f:".

It is because the working directory of Java process is current directory but 
the working directory of Python process is a temporary directory.

So there is no problem when relative path is used in the api call to java 
process. But if relative path is used in other place such as native file 
access, it will fail, because the working directory of python process has been 
change to a temporary directory that is not known to users.

I think it will cause some confusion for users, especially after we support 
dependency management. It will be great if we unify the working directory of 
Java process and Python process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Checkstyle-IDEA plugin For Java

2019-10-31 Thread tison
If you can import checkstyle rules file, the version of checkstyle plugin
is not very important.

We don't use nightly feature IIRC.

Best,
tison.


yanjun qiu  于2019年10月31日周四 下午7:22写道:

> Hi Community,
> I want to contribute code to Flink and I have followed the IDE set up
> guide as below:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/ide_setup.html#checkstyle-for-java
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/ide_setup.html#checkstyle-for-java
> >
>
> I have installed Checkstyle-IDEA plugin in my Intellj IDEA 2019.2, but I
> found that it didn’t have Checkstyle Version 8.14.
>
> I want to know which IDEA version or Checkstyle Version should be install.
>
> Regards,
> Bruce


Re: [ANNOUNCE] Becket Qin joins the Flink PMC

2019-10-31 Thread Guowei Ma
Congratulations, Becket!
Best,
Guowei


Steven Wu  于2019年11月1日周五 上午6:20写道:

> Congratulations, Becket!
>
> On Wed, Oct 30, 2019 at 9:51 PM Shaoxuan Wang  wrote:
>
> > Congratulations, Becket!
> >
> > On Mon, Oct 28, 2019 at 6:08 PM Fabian Hueske  wrote:
> >
> > > Hi everyone,
> > >
> > > I'm happy to announce that Becket Qin has joined the Flink PMC.
> > > Let's congratulate and welcome Becket as a new member of the Flink PMC!
> > >
> > > Cheers,
> > > Fabian
> > >
> >
>


Re: [ANNOUNCE] Becket Qin joins the Flink PMC

2019-10-31 Thread Steven Wu
Congratulations, Becket!

On Wed, Oct 30, 2019 at 9:51 PM Shaoxuan Wang  wrote:

> Congratulations, Becket!
>
> On Mon, Oct 28, 2019 at 6:08 PM Fabian Hueske  wrote:
>
> > Hi everyone,
> >
> > I'm happy to announce that Becket Qin has joined the Flink PMC.
> > Let's congratulate and welcome Becket as a new member of the Flink PMC!
> >
> > Cheers,
> > Fabian
> >
>


Re: [VOTE] Accept Stateful Functions into Apache Flink

2019-10-31 Thread Greg Hogan
+1 (binding)

Thank you to Stephan and all current and future contributors to this tool!

On Thu, Oct 31, 2019 at 4:24 AM Vijay Bhaskar 
wrote:

> +1 from me
>
> Regards
> Bhaskar
>
> On Thu, Oct 31, 2019 at 11:42 AM Gyula Fóra  wrote:
>
> > +1 from me, this is a great addition to Flink!
> >
> > Gyula
> >
> > On Thu, Oct 31, 2019, 03:52 Yun Gao 
> wrote:
> >
> > > +1 (non-binding)
> > > Very thanks for bringing this to the community!
> > >
> > >
> > > --
> > > From:jincheng sun 
> > > Send Time:2019 Oct. 31 (Thu.) 10:22
> > > To:dev 
> > > Cc:Vasiliki Kalavri 
> > > Subject:Re: [VOTE] Accept Stateful Functions into Apache Flink
> > >
> > > big +1 (binding)
> > >
> > > Andrey Zagrebin 于2019年10月30日 周三23:45写道:
> > >
> > > > sorry, my +1 was non-binding, confused that it was not a committer
> vote
> > > but
> > > > PMC.
> > > >
> > > > On Wed, Oct 30, 2019 at 4:43 PM Chesnay Schepler  >
> > > > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On 30/10/2019 15:25, Vasiliki Kalavri wrote:
> > > > > > +1 (binding) from me. I hope this is not too late :)
> > > > > >
> > > > > > Thank you for this great contribution!
> > > > > >
> > > > > > On Wed, 30 Oct 2019 at 14:45, Stephan Ewen 
> > wrote:
> > > > > >
> > > > > >> Thank you all for voting.
> > > > > >>
> > > > > >> The voting period has passed, but only 13 PMC members have voted
> > so
> > > > far,
> > > > > >> that is less than 2/3rd of the PMCs (17 members).
> > > > > >>
> > > > > >> I will take a few days to ping other members to vote, after that
> > we
> > > > will
> > > > > >> gradually lower the threshold as per the process to account for
> > > > inactive
> > > > > >> members.
> > > > > >>
> > > > > >> Best,
> > > > > >> Stephan
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> On Tue, Oct 29, 2019 at 6:20 PM Seth Wiesman <
> sjwies...@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >>> +1 (non-binding)
> > > > > >>>
> > > > > >>> Seth
> > > > > >>>
> > > > >  On Oct 23, 2019, at 9:31 PM, Jingsong Li <
> > jingsongl...@gmail.com>
> > > > > >> wrote:
> > > > >  +1 (non-binding)
> > > > > 
> > > > >  Best,
> > > > >  Jingsong Lee
> > > > > 
> > > > > > On Wed, Oct 23, 2019 at 9:02 PM Yu Li 
> > wrote:
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Best Regards,
> > > > > > Yu
> > > > > >
> > > > > >
> > > > > >> On Wed, 23 Oct 2019 at 16:56, Haibo Sun  >
> > > > wrote:
> > > > > >>
> > > > > >> +1 (non-binding)Best,
> > > > > >> Haibo
> > > > > >>
> > > > > >>
> > > > > >> At 2019-10-23 09:07:41, "Becket Qin" 
> > > > wrote:
> > > > > >>> +1 (binding)
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>>
> > > > > >>> Jiangjie (Becket) Qin
> > > > > >>>
> > > > > >>> On Tue, Oct 22, 2019 at 11:44 PM Tzu-Li (Gordon) Tai <
> > > > > > tzuli...@apache.org
> > > > > >>> wrote:
> > > > > >>>
> > > > >  +1 (binding)
> > > > > 
> > > > >  Gordon
> > > > > 
> > > > >  On Tue, Oct 22, 2019, 10:58 PM Zhijiang <
> > > > > >> wangzhijiang...@aliyun.com
> > > > >  .invalid>
> > > > >  wrote:
> > > > > 
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Best,
> > > > > > Zhijiang
> > > > > >
> > > > > >
> > > > > >
> > > > > --
> > > > > > From:Zhu Zhu 
> > > > > > Send Time:2019 Oct. 22 (Tue.) 16:33
> > > > > > To:dev 
> > > > > > Subject:Re: [VOTE] Accept Stateful Functions into Apache
> > > Flink
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Thanks,
> > > > > > Zhu Zhu
> > > > > >
> > > > > > Biao Liu  于2019年10月22日周二 上午11:06写道:
> > > > > >
> > > > > >> +1 (non-binding)
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Biao /'bɪ.aʊ/
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>> On Tue, 22 Oct 2019 at 10:26, Jark Wu <
> imj...@gmail.com>
> > > > > wrote:
> > > > > >>>
> > > > > >>> +1 (non-binding)
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Jark
> > > > > >>>
> > > > > >>> On Tue, 22 Oct 2019 at 09:38, Hequn Cheng <
> > > > > chenghe...@gmail.com
> > > > > > wrote:
> > > > >  +1 (non-binding)
> > > > > 
> > > > >  Best, Hequn
> > > > > 
> > > > >  On Tue, Oct 22, 2019 at 9:21 AM Dian Fu <
> > > > > > dian0511...@gmail.com>
> > > > > > wrote:
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Regards,
> > > > > > Dian
> > > > > >
> > > > > >> 在 

[jira] [Created] (FLINK-14589) Redundant slot requests with the same AllocationID leads to inconsistent slot table

2019-10-31 Thread Hwanju Kim (Jira)
Hwanju Kim created FLINK-14589:
--

 Summary: Redundant slot requests with the same AllocationID leads 
to inconsistent slot table
 Key: FLINK-14589
 URL: https://issues.apache.org/jira/browse/FLINK-14589
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.6.3
Reporter: Hwanju Kim


_NOTE: We found this issue in 1.6.2, but I checked the relevant code is still 
in the mainline. What I am not sure, however, is what other slot-related fixes 
after 1.6.2 (such as FLINK-11059 and FLINK-12863, etc) would prevent the 
initial cause of this issue from happening. So far, I have not found the 
related fix to the issue I am describing here, so opening this issue. Please 
feel free to deduplicate this if another one already covers it. Please note 
that we have already picked FLINK-9912, which turned out to be a major fix to 
slot allocation failure issue. I will note the ramification to that issue just 
in case others experience the same problem)._
h2. Summary

When *requestSlot* is called from ResourceManager (RM) to TaskManager (TM), TM 
firstly reserves the requested slot marking it as ALLOCATED, offers the slot to 
JM, and marks the slot as ACTIVE once getting acknowledgement from JM. This 
three-way communication for slot allocation is identified by AllocationID, 
which is generated by JM initially. The way TM reserves a slot is by calling 
*TaskSlotTable.allocateSlot* if the requested slot number (i.e., slot index) is 
free to use. The major data structure is *TaskSlot* indexed by slot index. Once 
the slot is marked as ALLOCATED with a given AllocationID, it tries to update 
other maps such as *allocationIDTaskSlotMap* keyed by AllocationID and 
*slotsPerJob* keyed by JobID. Here when updating *allocationIDTaskSlotMap*, 
it's directly using *allocationIDTaskSlotMap.put(allocationId, taskSlot)*, 
which may overwrite existing entry, if one is already there with the same 
AllocationID. This would render inconsistency between *TaskSlot* and 
*allocationIDTaskSlotMap*, where the former says two slots are allocated by the 
same AllocationID and the latter says the AllocationID only has the latest task 
slot. With this state, once the slot is freed, *freeSlot* is driven by 
AllocationID, so it fetches slot index (i.e., the latter one that has arrived 
later) from *allocationIDTaskSlotMap*, marks the slot free, and removes it from 
*allocationIDTaskSlotMap*. But still the old task slot is marked as allocated. 
This old task slot becomes zombie and can never be freed. This can cause 
permanent slot allocation failure if TM slots are statically and tightly 
provisioned and resource manager is not actively spawning new TMs where 
unavailable (e.g., Kubernetes without active mode integration, which is not yet 
available).
h2. Scenario

>From my observation, the redundant slot requests with the same AllocationID 
>and different slot indices should be rare but can happen with race condition 
>especially when repeated fail-over and heartbeat timeout (primarily caused by 
>transient resource overload, not permanent network partition/node outage) are 
>taking place. The following is a detailed scenario, which could lead to this 
>issue (AID is AllocationID):
 # AID1 is requested from JM and put in the pending request queue in RM.
 # RM picks up slot number 1 (Slot1) from freeSlots and performs requestSlot 
with Slot1 and AID1. Here this slot request is on the fly.
 # In the meantime, Slot1 is occupied by AID2 in TM for a delayed slot request 
and TM sends slot report via heartbeat to RM saying Slot1 is already allocated 
with AID2.
 # RM's heartbeat handler identifies that Slot1 is occupied with a different 
AID (AID2) so that it should reject the pending request sent from step 2.
 # handleFailedSlotRequest puts the rejected AID1 to pending request again by 
retrying the slot request. Now it picks up another available slot, say Slot2. 
So, the retried slot request with Slot 2 and AID1 is on the fly.
 # In the meantime, Slot1 occupied by AID2 is freed (by any disconnection with 
JM, or releasing all the tasks in the slot on cancellation/failure - the latter 
was observed).
 # The in-flight slot request (Slot1, AID1) from step 2 arrives at TM, and it's 
succeeded as Slot1 is free to allocate. TM offers the Slot1 to JM, which 
acknowledges it so that TM marks Slot1 ACTIVE with AID1. As this point, 
allocationIDTaskSlotMap[AID1] = Slot1 in TM. JM's allocatedSlots[AID1] = Slot1.
 # The next in-flight slot request (Slot2, AID1) from step 5 arrives at TM. As 
Slot2 is still free, TM marks it ALLOCATED and offers Slot2 to JM and 
*"overwrite allocationIDTaskSlotMap[AID1] to Slot2"*
 # In step 7, JM has allocatedSlots[AID1] = Slot1, which leads JM to reject the 
offer as the same AID is already occupied by another slot.
 # TM gets the rejected offer for (Slot2, AID1) and frees 

FW: Unit testing for ProcessAllWindowFunction

2019-10-31 Thread Diogo Araújo
Good afternoon,

After Reading the official flink testing documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html)
I was able to develop tests for a ProcessFunction, using a Test Harness, 
something like this:



pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")

testHarness =
  new 
OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData,PendingPartition](
new 
ProcessOperator[StaticAdequacyTilePublishedData,PendingPartition](pendingPartitionBuilder)
  )

testHarness.open()



now, I’m trying to do the same for a ProcessAllWindowFunction.
First I realized I can’t use TestHarness for  ProcessAllWindowFunction, because 
it doesn’t have a processElement method. In this case, what unit test strategy 
should I follow?


Diogo Araújo | Rockstar Developer
diogo.ara...@criticaltechworks.com
+351 912882824
[Critical TechWorks]
Rua do Campo Alegre, nº 17, piso 0 | 4150-177 Porto
www.criticaltechworks.com
[Critical TechWorks @ Instagram]  
[Critical TechWorks @ LinkedIn]  
 [Critical TechWorks @ 
Twitter]   [Critical TechWorks @ Facebook] 



From: Diogo Araújo 
Date: Thursday, 31 October 2019 at 16:55
To: "u...@flink.apache.org" 
Subject: Unit testing for ProcessAllWindowFunction

Good afternoon,

After Reading the official flink testing documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html)
I was able to develop tests for a ProcessFunction, using a Test Harness, 
something like this:



pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")

testHarness =
  new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, 
PendingPartition](
new ProcessOperator[StaticAdequacyTilePublishedData, 
PendingPartition](pendingPartitionBuilder)
  )

testHarness.open()



now, I’m trying to do the same for a ProcessAllWindowFunction.
First I realized I can’t use TestHarness for  ProcessAllWindowFunction, because 
it doesn’t have a processElement method. In this case, what unit test strategy 
should I follow?
Diogo Araújo | Rockstar Developer
diogo.ara...@criticaltechworks.com
+351 912882824
[Critical TechWorks]
Rua do Campo Alegre, nº 17, piso 0 | 4150-177 Porto
www.criticaltechworks.com
[Critical TechWorks @ Instagram]  
[Critical TechWorks @ LinkedIn]  
 [Critical TechWorks @ 
Twitter]   [Critical TechWorks @ Facebook] 




[jira] [Created] (FLINK-14588) Support Hive version 1.0.0 and 1.0.1

2019-10-31 Thread Rui Li (Jira)
Rui Li created FLINK-14588:
--

 Summary: Support Hive version 1.0.0 and 1.0.1
 Key: FLINK-14588
 URL: https://issues.apache.org/jira/browse/FLINK-14588
 Project: Flink
  Issue Type: Task
  Components: Connectors / Hive
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14587) HiveTableSourceTest timeouts on Travis

2019-10-31 Thread Gary Yao (Jira)
Gary Yao created FLINK-14587:


 Summary: HiveTableSourceTest timeouts on Travis
 Key: FLINK-14587
 URL: https://issues.apache.org/jira/browse/FLINK-14587
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Tests
Affects Versions: 1.10.0
Reporter: Gary Yao


{{HiveTableSourceTest}} failed in nightly tests running on Travis (stage: misc 
- scala 2.12)

https://travis-ci.org/apache/flink/builds/604945292?utm_source=slack_medium=notification

https://api.travis-ci.org/v3/job/604945309/log.txt

{noformat}
15:54:17.188 [INFO] Results:
15:54:17.188 [INFO] 
15:54:17.188 [ERROR] Errors: 
15:54:17.188 [ERROR]   
HiveTableSourceTest.org.apache.flink.connectors.hive.HiveTableSourceTest » 
Timeout
15:54:17.188 [INFO] 
15:54:17.188 [ERROR] Tests run: 232, Failures: 0, Errors: 1, Skipped: 1
15:54:17.188 [INFO] 
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Checkstyle-IDEA plugin For Java

2019-10-31 Thread yanjun qiu
Hi Community,
I want to contribute code to Flink and I have followed the IDE set up guide as 
below:
https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/ide_setup.html#checkstyle-for-java
 


I have installed Checkstyle-IDEA plugin in my Intellj IDEA 2019.2, but I found 
that it didn’t have Checkstyle Version 8.14.

I want to know which IDEA version or Checkstyle Version should be install.

Regards,
Bruce

[jira] [Created] (FLINK-14586) JobMaster issues promote calls if job is successful

2019-10-31 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-14586:


 Summary: JobMaster issues promote calls if job is successful
 Key: FLINK-14586
 URL: https://issues.apache.org/jira/browse/FLINK-14586
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Simon Su
Hi Till 
Thanks for your reply. Actually I modify the code like this:
I commented the filter part, and re-run the code, then it works well !!  The 
jar passed to createRemoteEnvironment is a udf jar, which does not contain my 
code 
My flink version is 1.9.0, So I’m confused about the actual behaviors of 
‘createRemoteEnvironment’. is it a potential bugs? 


ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");

DataSet data = env.readTextFile("/tmp/file");

data
//.filter(new FilterFunction() {
//public boolean filter(String value) {
//return value.startsWith("http://;);
//}
//})
.writeAsText("/tmp/file313");

env.execute();


Thanks,
SImon


On 10/31/2019 17:23,Till Rohrmann wrote:
In order to run the program on a remote cluster from the IDE you need to first 
build the jar containing your user code. This jar needs to passed to 
createRemoteEnvironment() so that the Flink client knows which jar to upload. 
Hence, please make sure that /tmp/myudf.jar contains your user code.


Cheers,
Till


On Thu, Oct 31, 2019 at 9:01 AM Simon Su  wrote:



Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink 
cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction() {
public boolean filter(String value) {
return value.startsWith("http://;);
}
})
.writeAsText("/tmp/file1");

env.execute();
}

When I run the program, I raises the error like: 


Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
1f32190552e955bb2048c31930edfb0e)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's 
outputs caused an error: Could not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at 
org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at 
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 

Re: Builds Notification Update

2019-10-31 Thread Xiyuan Wang
>
> Thank you very much. I'll prepare and start to send the test result from
> next week.
>
>


Re: [DISCUSS] FLIP-83: Flink End-to-end Performance Testing Framework

2019-10-31 Thread Yu Li
Hi Piotr,

Thanks for the comments!

bq. How are you planning to execute the end-to-end benchmarks and integrate
them with our build process?
Great question! We plan to execute the end-to-end benchmark in a small
cluster (like 3 vm nodes) to better reflect network cost, triggering it
through our Jenkins service for micro benchmark and show the result on
code-speed center. Will add these into FLIP document if no objections.

bq. Are you planning to monitor the throughput and latency at the same time?
Good question. And you're right, we will stress the cluster to
back-pressure and watch the throughput, latency doesn't mean much in the
first test suites. Let me refine the document.

Thanks.

Best Regards,
Yu


On Wed, 30 Oct 2019 at 19:07, Piotr Nowojski  wrote:

> Hi Yu,
>
> Thanks for bringing this up.
>
> +1 for the idea and the proposal from my side.
>
> I think that the proposed Test Job List might be a bit
> redundant/excessive, but:
> - we can always adjust this later, once we have the infrastructure in place
> - as long as we have the computing resources and ability to quickly
> interpret the results/catch regressions, it doesn’t hurt to have more
> benchmarks/tests then strictly necessary.
>
> Which brings me to a question. How are you planning to execute the
> end-to-end benchmarks and integrate them with our build process?
>
> Another smaller question:
>
> > In this initial stage we will only monitor and display job throughput
> and latency.
>
> Are you planning to monitor the throughput and latency at the same time?
> It might be a bit problematic, as when measuring the throughput you want to
> saturate the system and hit some bottleneck, which will cause a
> back-pressure (measuring latency at the same time when system is back
> pressured doesn’t make much sense).
>
> Piotrek
>
> > On 30 Oct 2019, at 11:54, Yu Li  wrote:
> >
> > Hi everyone,
> >
> > We would like to propose FLIP-83 that adds an end-to-end performance
> > testing framework for Flink. We discovered some potential problems
> through
> > such an internal end-to-end performance testing framework before the
> > release of 1.9.0 [1], so we'd like to contribute it to Flink community
> as a
> > supplement to the existing daily run micro performance benchmark [2] and
> > nightly run end-to-end stability test [3].
> >
> > The FLIP document could be found here:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-83%3A+Flink+End-to-end+Performance+Testing+Framework
> >
> > Please kindly review the FLIP document and let us know if you have any
> > comments/suggestions, thanks!
> >
> > [1] https://s.apache.org/m8kcq
> > [2] https://github.com/dataArtisans/flink-benchmarks
> > [3] https://github.com/apache/flink/tree/master/flink-end-to-end-tests
> >
> > Best Regards,
> > Yu
>
>


Re: [Discussion] FLIP-79 Flink Function DDL Support

2019-10-31 Thread Terry Wang
Hi Peter,

I’d like to share some thoughts from mysids:
1. what's the syntax to distinguish function language ?
+1 for using `[LANGUAGE JVM|PYTHON] USING JAR`
2. How to persist function language in backend catalog ?
+ 1 for a separate field in CatalogFunction. But as to specific 
backend, we may persist it case by case. Special case includes how HiveCatalog 
store the kind of CatalogFucnction.
3. do we really need to allow users set a properties map for a udf?
There are use case requiring passing external arguments to udf for sure, 
but the need can also be met by passing arguments to `eval` when calling udf in 
sql. 
IMO, there is not much need to support set properties map for a udf.

4. Should a catalog implement to be able to decide whether it can take a 
properties map, and which language of a udf it can persist?
IMO, it’s necessary for catalog implementation to provide such information. But 
for flink 1.10 map goal, we can just skip this part.



Best,
Terry Wang



> 2019年10月30日 13:52,Peter Huang  写道:
> 
> Hi Bowen,
> 
> I can't agree more about we first have an agreement on the DDL syntax and
> focus on the MVP in the current phase.
> 
> 1) what's the syntax to distinguish function language
> Currently, there are two opinions:
> 
>   - USING 'python .'
>   - [LANGUAGE JVM|PYTHON] USING JAR '...'
> 
> As we need to support multiple resources as HQL, we shouldn't repeat the
> language symbol as a suffix of each resource.
> I would prefer option two, but definitely open to more comments.
> 
> 2) How to persist function language in backend catalog? as a k-v pair in
> properties map, or a dedicate field?
> Even though language type is also a property, I think a separate field in
> CatalogFunction is a more clean solution.
> 
> 3) do we really need to allow users set a properties map for udf? what needs
> to be stored there? what are they used for?
> 
> I am considering a type of use case that use UDFS for realtime inference.
> The model is nested in the udf as a resource. But there are
> multiple parameters are customizable. In this way, user can use properties
> to define those parameters.
> 
> I only have answers to these questions. For questions about the catalog
> implementation, I hope we can collect more feedback from the community.
> 
> 
> Best Regards
> Peter Huang
> 
> 
> 
> 
> 
> Best Regards
> Peter Huang
> 
> On Tue, Oct 29, 2019 at 11:31 AM Bowen Li  wrote:
> 
>> Hi all,
>> 
>> Besides all the good questions raised above, we seem all agree to have a
>> MVP for Flink 1.10, "to support users to create and persist a java
>> class-based udf that's already in classpath (no extra resource loading),
>> and use it later in queries".
>> 
>> IIUIC, to achieve that in 1.10, the following are currently the core
>> issues/blockers we should figure out, and solve them as our **highest
>> priority**:
>> 
>> - what's the syntax to distinguish function language (java, scala, python,
>> etc)? we only need to implement the java one in 1.10 but have to settle
>> down the long term solution
>> - how to persist function language in backend catalog? as a k-v pair in
>> properties map, or a dedicate field?
>> - do we really need to allow users set a properties map for udf? what needs
>> to be stored there? what are they used for?
>> - should a catalog impl be able to decide whether it can take a properties
>> map (if we decide to have one), and which language of a udf it can persist?
>>   - E.g. Hive metastore, which backs Flink's HiveCatalog, cannot take a
>> properties map and is only able to persist java udf [1], unless we do
>> something hacky to it
>> 
>> I feel these questions are essential to Flink functions in the long run,
>> but most importantly, are also the minimum scope for Flink 1.10. Aspects
>> like resource loading security or compatibility with Hive syntax are
>> important too, however if we focus on them now, we may not be able to get
>> the MVP out in time.
>> 
>> [1]
>> -
>> 
>> https://hive.apache.org/javadocs/r3.1.2/api/org/apache/hadoop/hive/metastore/api/Function.html
>> -
>> 
>> https://hive.apache.org/javadocs/r3.1.2/api/org/apache/hadoop/hive/metastore/api/FunctionType.html
>> 
>> 
>> 
>> On Sun, Oct 27, 2019 at 8:22 PM Peter Huang 
>> wrote:
>> 
>>> Hi Timo,
>>> 
>>> Thanks for the feedback. I replied and adjust the design accordingly. For
>>> the concern of class loading.
>>> I think we need to distinguish the function class loading for Temporary
>> and
>>> Permanent function.
>>> 
>>> 1) For Permanent function, we can add it to the job graph so that we
>> don't
>>> need to load it multiple times for the different sessions.
>>> 2) For Temporary function, we can register function with a session key,
>> and
>>> use different class loaders in RuntimeContext implementation.
>>> 
>>> I added more description in the doc. Please review it again.
>>> 
>>> 
>>> Best Regards
>>> Peter Huang
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Oct 24, 2019 at 2:14 AM Timo Walther  wrote:
>>> 

[jira] [Created] (FLINK-14585) flink connector for sink to apache/incubator-druid

2019-10-31 Thread xiaodao (Jira)
xiaodao created FLINK-14585:
---

 Summary: flink connector for sink to apache/incubator-druid
 Key: FLINK-14585
 URL: https://issues.apache.org/jira/browse/FLINK-14585
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Affects Versions: 1.9.1
Reporter: xiaodao


add new flink connector for sink data to apache/incubator-druid,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Flavio Pompermaier
Hi all,
we're using a lot the multiple jobs in one program and this is why: when
you fetch data from a huge number of sources and, for each source, you do
some transformation and then you want to write into a single directory the
union of all outputs (this assumes you're doing batch). When the number of
sources is large, if you want to do this in a single job, the graph becomes
very big and this is a problem for several reasons:

   - too many substasks /threadsi per slot
   - increase of back pressure
   - if a single "sub-job" fails all the job fails..this is very annoying
   if this happens after a half a day for example
   - In our use case, the big-graph mode takes much longer than running
   each job separately (but maybe this is true only if you don't have much
   hardware resources)
   - debugging the cause of a fail could become a daunting task if the job
   graph is too large
   - we faced may strange errors when trying to run the single big-job mode
   (due to serialization corruption)

So, summarizing our overall experience with Flink batch is: the easier is
the job graph the better!

Best,
Flavio


On Thu, Oct 31, 2019 at 10:14 AM Yang Wang  wrote:

> Thanks for tison starting this exciting discussion. We also suffer a lot
> from the per job mode.
> I think the per-job cluster is a dedicated cluster for only one job and
> will not accept more other
> jobs. It has the advantage of one-step submission, do not need to start
> dispatcher first and
> then submit the job. And it does not matter where the job graph is
> generated and job is submitted.
> Now we have two cases.
>
> (1) Current Yarn detached cluster. The job graph is generated in client
> and then use distributed
> cache to flink master container. And the MiniDispatcher uses
> `FileJobGraphRetrieve` to get it.
> The job will be submitted at flink master side.
>
>
> (2) Standalone per job cluster. User jars are already built into image. So
> the job graph will be
> generated at flink master side and `ClasspathJobGraphRetriver` is used to
> get it. The job will
> also be submitted at flink master side.
>
>
> For the (1) and (2), only one job in user program could be supported. The
> per job means
> per job-graph, so it works just as expected.
>
>
> Tison suggests to add a new mode "per-program”. The user jar will be
> transferred to flink master
> container, and a local client will be started to generate job graph and
> submit job. I think it could
> cover all the functionality of current per job, both (1) and (2). Also the
> detach mode and attach
> mode could be unified. We do not need to start a session cluster to
> simulate per job for multiple parts.
>
>
> I am in favor of the “per-program” mode. Just two concerns.
> 1. How many users are using multiple jobs in one program?
> 2. Why do not always use session cluster to simulate per job? Maybe
> one-step submission
> is a convincing reason.
>
>
>
> Best,
> Yang
>
> tison  于2019年10月31日周四 上午9:18写道:
>
>> Thanks for your attentions!
>>
>> @shixiaoga...@gmail.com 
>>
>> Yes correct. We try to avoid jobs affect one another. Also a local
>> ClusterClient
>> in case saves the overhead about retry before leader elected and persist
>> JobGraph before submission in RestClusterClient as well as the net cost.
>>
>> @Paul Lam 
>>
>> 1. Here is already a note[1] about multiple part jobs. I am also confused
>> a bit
>> on this concept at first :-) Things go in similar way if you program
>> contains the
>> only JobGraph so that I think per-program acts like per-job-graph in this
>> case
>> which provides compatibility for many of one job graph program.
>>
>> Besides, we have to respect user program which doesn't with current
>> implementation because we return abruptly when calling env#execute which
>> hijack user control so that they cannot deal with the job result or the
>> future of
>> it. I think this is why we have to add a detach/attach option.
>>
>> 2. For compilation part, I think it could be a workaround that you upload
>> those
>> resources in a commonly known address such as HDFS so that compilation
>> can read from either client or cluster.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
>>
>>
>> Newport, Billy  于2019年10月30日周三 下午10:41写道:
>>
>>> We execute multiple job graphs routinely because we cannot submit a
>>> single graph without it blowing up. I believe Regina spoke to this in
>>> Berlin during her talk. We instead if we are processing a database
>>> ingestion with 200 tables in it, we do a job graph per table rather than a
>>> single job graph that does all tables instead. A single job graph can be in
>>> the tens of thousands of nodes in our largest cases and we have found flink
>>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently
>>> testing 1.9.1 but have not retested the large graph scenario.
>>>
>>>
>>>
>>> 

Reminder: please update FLIP document to latest status

2019-10-31 Thread Yu Li
Dear all,

While checking 1.10 release progress, we find some FLIPs have already been
accepted through voting, but still marked as "Under Discussion" in wiki
document (eg. FLIP-68), and some marked as implementing but lack of JIRA
link (eg. FLIP-64). So just a reminder to our FLIP authors/owners, that
please check the FLIP document and update it to the latest status when
possible. Many thanks.

Best Regards,
Yu


[jira] [Created] (FLINK-14584) Support complex data types in Python user-defined functions

2019-10-31 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-14584:


 Summary: Support complex data types in Python user-defined 
functions
 Key: FLINK-14584
 URL: https://issues.apache.org/jira/browse/FLINK-14584
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.10.0


This jira is a sub-task of FLINK-14388. In this jira, complex data types which 
include ArrayType, DecimalType, MapType and MultisetType are dedicated to be 
supported for python UDF.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Till Rohrmann
In order to run the program on a remote cluster from the IDE you need to
first build the jar containing your user code. This jar needs to passed
to createRemoteEnvironment() so that the Flink client knows which jar to
upload. Hence, please make sure that /tmp/myudf.jar contains your user code.

Cheers,
Till

On Thu, Oct 31, 2019 at 9:01 AM Simon Su  wrote:

>
> Hi all
>I want to test to submit a job from my local IDE and I deployed a Flink
> cluster in my vm.
>Here is my code from Flink 1.9 document and add some of my parameters.
>
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment
> .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>
> DataSet data = env.readTextFile("/tmp/file");
>
> data
> .filter(new FilterFunction() {
> public boolean filter(String value) {
> return value.startsWith("http://;);
> }
> })
> .writeAsText("/tmp/file1");
>
> env.execute();
> }
>
> When I run the program, I raises the error like:
>
> Exception in thread "main"
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 1f32190552e955bb2048c31930edfb0e)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
> at
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
> at
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at TestMain.main(TestMain.java:25)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 8 more
> *Caused by: java.lang.RuntimeException: The initialization of the
> DataSource's outputs caused an error: Could not read the user code wrapper:
> TestMain$1*
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> *Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: TestMain$1*
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> at
> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
> at
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
> at
> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
> at
> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
> at
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
> ... 3 more
> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at 

[RESULT][VOTE] FLIP-70: Flink SQL Computed Column Design

2019-10-31 Thread Danny Chan
Hi all,

The voting time for FLIP-70 has passed. I'm closing the vote now.

There were seven +1 votes, 3 of which are binding:
- David (binding)
- Jark Wu (binding)
- Kurt Young (binding)

- Benchao Li (non-binding)
- Terry Wang (non-binding)
- Jingsong Li (non-binding)
- Zhenghua Gao (non-binding)

There were no disapproving votes.

Thus, FLIP-70 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Danny Chan


Re: possible backwards compatibility issue between 1.8->1.9?

2019-10-31 Thread Piotr Nowojski
Hi,

(This question is more appropriate for the user mailing list, not dev - when 
responding to my e-mail please remove dev mailing list from the recipients, 
I’ve kept it just FYI that discussion has moved to user mailing list).

Could it be, that the problem is caused by changes in chaining strategy of the 
AsyncWaitOperator in 1.9, as explained in the release notes [1]?

> AsyncIO
> Due to a bug in the AsyncWaitOperator, in 1.9.0 the default chaining 
> behaviour of the operator is now changed so that it 
> is never chained after another operator. This should not be problematic for 
> migrating from older version snapshots as 
> long as an uid was assigned to the operator. If an uid was not assigned to 
> the operator, please see the instructions here [2]
> for a possible workaround.
>
> Related issues:
>
>   • FLINK-13063: AsyncWaitOperator shouldn’t be releasing 
> checkpointingLock [3]

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.9.html#asyncio
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/upgrading.html#matching-operator-state
 

[3] https://issues.apache.org/jira/browse/FLINK-13063 


> On 30 Oct 2019, at 16:52, Bekir Oguz  wrote:
> 
> Hi guys,
> during our upgrade from 1.8.1 to 1.9.1, one of our jobs fail to start with
> the following exception. We deploy the job with 'allow-non-restored-state'
> option and from the latest checkpoint dir of the 1.8.1 version.
> 
> org.apache.flink.util.StateMigrationException: The new state typeSerializer
> for operator state must not be incompatible.
>at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:323)
>at org.apache.flink.runtime.state.DefaultOperatorStateBackend
> .getListState(DefaultOperatorStateBackend.java:214)
>at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
> .initializeState(AsyncWaitOperator.java:272)
>at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:281)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(
> StreamTask.java:881)
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:395)
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
> 
> We see from the Web UI that the 'async wait operator' is causing this,
> which is not changed at all during this upgrade.
> 
> All other jobs are migrated without problems, only this one is failing. Has
> anyone else experienced this during migration?
> 
> Regards,
> Bekir Oguz



Re: [VOTE] Accept Stateful Functions into Apache Flink

2019-10-31 Thread Vijay Bhaskar
+1 from me

Regards
Bhaskar

On Thu, Oct 31, 2019 at 11:42 AM Gyula Fóra  wrote:

> +1 from me, this is a great addition to Flink!
>
> Gyula
>
> On Thu, Oct 31, 2019, 03:52 Yun Gao  wrote:
>
> > +1 (non-binding)
> > Very thanks for bringing this to the community!
> >
> >
> > --
> > From:jincheng sun 
> > Send Time:2019 Oct. 31 (Thu.) 10:22
> > To:dev 
> > Cc:Vasiliki Kalavri 
> > Subject:Re: [VOTE] Accept Stateful Functions into Apache Flink
> >
> > big +1 (binding)
> >
> > Andrey Zagrebin 于2019年10月30日 周三23:45写道:
> >
> > > sorry, my +1 was non-binding, confused that it was not a committer vote
> > but
> > > PMC.
> > >
> > > On Wed, Oct 30, 2019 at 4:43 PM Chesnay Schepler 
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On 30/10/2019 15:25, Vasiliki Kalavri wrote:
> > > > > +1 (binding) from me. I hope this is not too late :)
> > > > >
> > > > > Thank you for this great contribution!
> > > > >
> > > > > On Wed, 30 Oct 2019 at 14:45, Stephan Ewen 
> wrote:
> > > > >
> > > > >> Thank you all for voting.
> > > > >>
> > > > >> The voting period has passed, but only 13 PMC members have voted
> so
> > > far,
> > > > >> that is less than 2/3rd of the PMCs (17 members).
> > > > >>
> > > > >> I will take a few days to ping other members to vote, after that
> we
> > > will
> > > > >> gradually lower the threshold as per the process to account for
> > > inactive
> > > > >> members.
> > > > >>
> > > > >> Best,
> > > > >> Stephan
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Tue, Oct 29, 2019 at 6:20 PM Seth Wiesman  >
> > > > wrote:
> > > > >>
> > > > >>> +1 (non-binding)
> > > > >>>
> > > > >>> Seth
> > > > >>>
> > > >  On Oct 23, 2019, at 9:31 PM, Jingsong Li <
> jingsongl...@gmail.com>
> > > > >> wrote:
> > > >  +1 (non-binding)
> > > > 
> > > >  Best,
> > > >  Jingsong Lee
> > > > 
> > > > > On Wed, Oct 23, 2019 at 9:02 PM Yu Li 
> wrote:
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best Regards,
> > > > > Yu
> > > > >
> > > > >
> > > > >> On Wed, 23 Oct 2019 at 16:56, Haibo Sun 
> > > wrote:
> > > > >>
> > > > >> +1 (non-binding)Best,
> > > > >> Haibo
> > > > >>
> > > > >>
> > > > >> At 2019-10-23 09:07:41, "Becket Qin" 
> > > wrote:
> > > > >>> +1 (binding)
> > > > >>>
> > > > >>> Thanks,
> > > > >>>
> > > > >>> Jiangjie (Becket) Qin
> > > > >>>
> > > > >>> On Tue, Oct 22, 2019 at 11:44 PM Tzu-Li (Gordon) Tai <
> > > > > tzuli...@apache.org
> > > > >>> wrote:
> > > > >>>
> > > >  +1 (binding)
> > > > 
> > > >  Gordon
> > > > 
> > > >  On Tue, Oct 22, 2019, 10:58 PM Zhijiang <
> > > > >> wangzhijiang...@aliyun.com
> > > >  .invalid>
> > > >  wrote:
> > > > 
> > > > > +1 (non-binding)
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > >
> > > > >
> > > > >
> > > > --
> > > > > From:Zhu Zhu 
> > > > > Send Time:2019 Oct. 22 (Tue.) 16:33
> > > > > To:dev 
> > > > > Subject:Re: [VOTE] Accept Stateful Functions into Apache
> > Flink
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Biao Liu  于2019年10月22日周二 上午11:06写道:
> > > > >
> > > > >> +1 (non-binding)
> > > > >>
> > > > >> Thanks,
> > > > >> Biao /'bɪ.aʊ/
> > > > >>
> > > > >>
> > > > >>
> > > > >>> On Tue, 22 Oct 2019 at 10:26, Jark Wu 
> > > > wrote:
> > > > >>>
> > > > >>> +1 (non-binding)
> > > > >>>
> > > > >>> Best,
> > > > >>> Jark
> > > > >>>
> > > > >>> On Tue, 22 Oct 2019 at 09:38, Hequn Cheng <
> > > > chenghe...@gmail.com
> > > > > wrote:
> > > >  +1 (non-binding)
> > > > 
> > > >  Best, Hequn
> > > > 
> > > >  On Tue, Oct 22, 2019 at 9:21 AM Dian Fu <
> > > > > dian0511...@gmail.com>
> > > > > wrote:
> > > > > +1 (non-binding)
> > > > >
> > > > > Regards,
> > > > > Dian
> > > > >
> > > > >> 在 2019年10月22日,上午9:10,Kurt Young 
> 写道:
> > > > >>
> > > > >> +1 (binding)
> > > > >>
> > > > >> Best,
> > > > >> Kurt
> > > > >>
> > > > >>
> > > > >> On Tue, Oct 22, 2019 at 12:56 AM Fabian Hueske <
> > > > > fhue...@gmail.com>
> > > > > wrote:
> > > > >>> +1 (binding)
> > > > >>>
> > > > >>> Am Mo., 21. Okt. 2019 um 16:18 Uhr schrieb Thomas
> > Weise <
> > > >  t...@apache.org
> > > 

[jira] [Created] (FLINK-14583) Remove progressLock from ExecutionGraph

2019-10-31 Thread vinoyang (Jira)
vinoyang created FLINK-14583:


 Summary: Remove progressLock from ExecutionGraph
 Key: FLINK-14583
 URL: https://issues.apache.org/jira/browse/FLINK-14583
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: vinoyang


There is a lock object named {{progressLock}} which used to secure all access 
to mutable fields, especially the tracking of progress within the job. Now, the 
{{ExecutionGraph}} can only be accessed by a single thread at any time point. 
IMO, we can remove this lock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14582) Do not upload {uuid}-taskmanager-conf.yaml for each task manager container

2019-10-31 Thread Yang Wang (Jira)
Yang Wang created FLINK-14582:
-

 Summary: Do not upload {uuid}-taskmanager-conf.yaml for each task 
manager container
 Key: FLINK-14582
 URL: https://issues.apache.org/jira/browse/FLINK-14582
 Project: Flink
  Issue Type: Improvement
Reporter: Yang Wang


Currently when we {{createTaskExecutorLaunchContext}}, a new task manager conf 
will be created and uploaded to hdfs. The time cost is unnecessary and could be 
reduced by using java options of task manager main class instead.

By reducing the time of launch container in {{onContainersAllocated}}, we could 
avoid allocating excess containers as 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Introduce a location-oriented two-stage query mechanism toimprove the queryable state.

2019-10-31 Thread vino yang
Hi Yu,

Thanks for your suggestion. Yes, the community is missing some attention
and exploration of the application scenario (even for the state processing
api that was recently released).

I do have some frustration because I have implemented and verified this
feature. But it doesn't matter, let's look ahead.

I think Stateful Functions is a step in this direction. I hope that more
people will pay attention to the value of state exploration and expansion
in the future.

Best,
Vino

Yu Li  于2019年10月30日周三 下午9:53写道:

> Thanks for bringing this up again Vino.
>
> Considering the current status that the community is lack of committer
> bandwidth on QueryableState [1] [2], I think David has made a good
> point/concern, that we need to collect more information about valid
> in-production QueryableState use cases to draw more attention on this
> module (I have collected some, and there are indeed scattered requirements
> around QueryableState, but let's open one thread to explicitly discuss
> about this, later). IMHO this is the precondition of putting review
> resource to improve the module.
>
> I believe there will be committer bandwidth on this, sooner or later, but
> we need to go further by ourselves before that happens, and what we need is
> more patience. Hope this won't frustrate you too much, and please correct
> me if any committer would like to put more effort into this. Thanks.
>
> [1] https://s.apache.org/MaOl
> [2] https://s.apache.org/r8k8a
>
> Best Regards,
> Yu
>
>
> On Fri, 25 Oct 2019 at 23:59, vino yang  wrote:
>
>> Hi David,
>>
>> I know that in some scenarios, the queryable state has some limitations.
>> But in some scenarios, it is valuable, such as in scenarios where you need
>> to observe "trends." It can be analogized to the two existing features
>> provided by Flink web UI:
>>
>>- Metrics: An instantaneous value that can be queried is like a
>>business metric of Flink jobs;
>>- BackPressure: A state query is like instantaneous sampling, and a
>>periodic query is similar to periodic sampling;
>>
>> I just want to say that although our state can get the semantic support
>> of "correctness", however, users don't always pay attention to (or use) the
>> final correct result. Sometimes they want to observe the trend of the value
>> of a key in one job run cycle. When the job is resumed, then the job goes
>> into another new run cycle, and they can continue to observe its trend.
>>
>> The queryable state is oriented to KV State, so there is no problem with
>> point-oriented queries. Several months ago, I have also suggested whether
>> the window state can be queried [1]. But this topic has not attracted
>> attention, maybe everyone is busy releasing 1.9 at that moment. I see there
>> was a project named "query-window-example" in dataArtisans try to take an
>> effort to do this job.[2]
>>
>> There was another ML thread which asked "Making broadcast state
>> queryable?".[3]
>>
>> So, I really want to know what does the community thinks about the
>> queryable state.
>>
>> If the community feels that it still has some meaning, then we can
>> continue to improve it and see how it will work in the future.
>>
>> Best,
>> Vino
>>
>> [1]:
>> http://mail-archives.apache.org/mod_mbox/flink-user/201907.mbox/%3CCAA_=o7bn3nkbsoeayfhxpdteeffxdswzrp2wpyjwp9mnplz...@mail.gmail.com%3E
>> [2]: https://github.com/dataArtisans/query-window-example
>> [3]:
>> http://mail-archives.apache.org/mod_mbox/flink-user/201909.mbox/%3CCAM7-19KYbu0c_==npts9z+ckgycpzqp+i5plgxg6xqaqdn5...@mail.gmail.com%3E
>>
>> David Anderson  于2019年10月25日周五 下午4:56写道:
>>
>>> I've encountered a number of Flink users who considered using
>>> queryable state, but after investigation, decided not to. The reasons
>>> have been:
>>>
>>> (1) The current interface (point queries fetching state for one key)
>>> is too limiting. What some folks really want/need is the ability to
>>> execute SQL-queries against the state.
>>>
>>> (2) The state is not highly available. If the job isn't running, the
>>> state can not be queried. (Hypothetically, a queryable state service
>>> could fall back to querying a snapshot for state for a job that isn't
>>> currently running, but that sounds a bit crazy.)
>>>
>>> (3) During recovery the state can regress, in the sense that it
>>> reflects an earlier point in time than what may've been previously
>>> fetched.
>>>
>>> (4) The state that is wanted (e.g., window state, or operator state)
>>> isn't queryable.
>>>
>>> Best,
>>> David
>>>
>>> On Fri, Oct 25, 2019 at 9:51 AM vino yang  wrote:
>>> >
>>> > Hi Jiayi,
>>> >
>>> > Thanks for your valuable feedback and suggestions.
>>> >
>>> > In our production env, we still have many applications wrote by
>>> DataStream
>>> > API.
>>> >
>>> > Currently, we have some requirements that require Adhoc query for the
>>> > runtime Flink job. The existing query interface is very difficult to
>>> use.
>>> > This improvement is to enhance the usability of the queryable 

RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Simon Su


Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink 
cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction() {
public boolean filter(String value) {
return value.startsWith("http://;);
}
})
.writeAsText("/tmp/file1");

env.execute();
}

When I run the program, I raises the error like: 


Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
1f32190552e955bb2048c31930edfb0e)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's 
outputs caused an error: Could not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at 
org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at 
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more


My understanding is that when using remote environment, I don’t need to upload 
my program 

[jira] [Created] (FLINK-14581) Support to run Python UDF jobs in a YARN cluster

2019-10-31 Thread Dian Fu (Jira)
Dian Fu created FLINK-14581:
---

 Summary: Support to run Python UDF jobs in a YARN cluster
 Key: FLINK-14581
 URL: https://issues.apache.org/jira/browse/FLINK-14581
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


Currently it will throw the following exception when submit a Python UDF job in 
YARN cluster:
{code:java}
Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.io.IOException: Cannot run program "null/bin/pyflink-udf-runner.sh": 
error=2, No such file or directory
  at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
  at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
  at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
  at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
  at 
org.apache.flink.python.AbstractPythonFunctionRunner.open(AbstractPythonFunctionRunner.java:171)
  at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator$ProjectUdfInputPythonScalarFunctionRunner.open(AbstractPythonScalarFunctionOperator.java:177)
  at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:114)
  at 
org.apache.flink.table.runtime.operators.python.AbstractPythonScalarFunctionOperator.open(AbstractPythonScalarFunctionOperator.java:137)
  at 
org.apache.flink.table.runtime.operators.python.PythonScalarFunctionOperator.open(PythonScalarFunctionOperator.java:70)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:565)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:412)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
  ... 1 more
{code}

The reason is that pyflink-udf-runner.sh is not submitted and is not available 
for the operator.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Builds Notification Update

2019-10-31 Thread Jark Wu
Hi Xiyuan,

I added "i...@openlabtesting.org" to the allow list of
bui...@flink.apache.org.
It should have the right to send email to builds ML now.
Please let me know if you have any questions.

Best,
Jark

On Wed, 30 Oct 2019 at 15:59, Xiyuan Wang  wrote:

> Hi,
>
>   As discussed before, Flink ARM builds and test now is running as cron job
> in OpenLab CI. The test result can be sent to this mail list once a day
> from "i...@openlabtesting.org".  Can you give ` i...@openlabtesting.org`
> the right to send e-mail to bui...@flink.apache.org? It can help community
> to find and check ARM related problem.
>
> Thanks
>
> Jark Wu  于2019年10月30日周三 上午11:57写道:
>
> > Hi,
> >
> > This is an update and discussion thread to the notification bot and
> builds
> > mailing list.
> >
> > We created the bui...@flink.apache.org mailing list to monitor the
> builds
> > status. And a notification bot is running behind it. You can subscribe
> the
> > mailing list via sending an email to
> >
> > builds-subscr...@flink.apache.org
> >
> > Recently, I updated the bot to only notify for master and release
> branches.
> > Because I noticed there are notifications for some testing/temporary
> > branches in these days, these notifications are noise so I filtered them.
> >
> > Additionally, you can leave your thoughts about the mailing list and bot
> in
> > this thread.
> >
> > Best,
> > Jark
> >
>


Re: [VOTE] Accept Stateful Functions into Apache Flink

2019-10-31 Thread Gyula Fóra
+1 from me, this is a great addition to Flink!

Gyula

On Thu, Oct 31, 2019, 03:52 Yun Gao  wrote:

> +1 (non-binding)
> Very thanks for bringing this to the community!
>
>
> --
> From:jincheng sun 
> Send Time:2019 Oct. 31 (Thu.) 10:22
> To:dev 
> Cc:Vasiliki Kalavri 
> Subject:Re: [VOTE] Accept Stateful Functions into Apache Flink
>
> big +1 (binding)
>
> Andrey Zagrebin 于2019年10月30日 周三23:45写道:
>
> > sorry, my +1 was non-binding, confused that it was not a committer vote
> but
> > PMC.
> >
> > On Wed, Oct 30, 2019 at 4:43 PM Chesnay Schepler 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > On 30/10/2019 15:25, Vasiliki Kalavri wrote:
> > > > +1 (binding) from me. I hope this is not too late :)
> > > >
> > > > Thank you for this great contribution!
> > > >
> > > > On Wed, 30 Oct 2019 at 14:45, Stephan Ewen  wrote:
> > > >
> > > >> Thank you all for voting.
> > > >>
> > > >> The voting period has passed, but only 13 PMC members have voted so
> > far,
> > > >> that is less than 2/3rd of the PMCs (17 members).
> > > >>
> > > >> I will take a few days to ping other members to vote, after that we
> > will
> > > >> gradually lower the threshold as per the process to account for
> > inactive
> > > >> members.
> > > >>
> > > >> Best,
> > > >> Stephan
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Oct 29, 2019 at 6:20 PM Seth Wiesman 
> > > wrote:
> > > >>
> > > >>> +1 (non-binding)
> > > >>>
> > > >>> Seth
> > > >>>
> > >  On Oct 23, 2019, at 9:31 PM, Jingsong Li 
> > > >> wrote:
> > >  +1 (non-binding)
> > > 
> > >  Best,
> > >  Jingsong Lee
> > > 
> > > > On Wed, Oct 23, 2019 at 9:02 PM Yu Li  wrote:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Best Regards,
> > > > Yu
> > > >
> > > >
> > > >> On Wed, 23 Oct 2019 at 16:56, Haibo Sun 
> > wrote:
> > > >>
> > > >> +1 (non-binding)Best,
> > > >> Haibo
> > > >>
> > > >>
> > > >> At 2019-10-23 09:07:41, "Becket Qin" 
> > wrote:
> > > >>> +1 (binding)
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Jiangjie (Becket) Qin
> > > >>>
> > > >>> On Tue, Oct 22, 2019 at 11:44 PM Tzu-Li (Gordon) Tai <
> > > > tzuli...@apache.org
> > > >>> wrote:
> > > >>>
> > >  +1 (binding)
> > > 
> > >  Gordon
> > > 
> > >  On Tue, Oct 22, 2019, 10:58 PM Zhijiang <
> > > >> wangzhijiang...@aliyun.com
> > >  .invalid>
> > >  wrote:
> > > 
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Zhijiang
> > > >
> > > >
> > > >
> > > --
> > > > From:Zhu Zhu 
> > > > Send Time:2019 Oct. 22 (Tue.) 16:33
> > > > To:dev 
> > > > Subject:Re: [VOTE] Accept Stateful Functions into Apache
> Flink
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Thanks,
> > > > Zhu Zhu
> > > >
> > > > Biao Liu  于2019年10月22日周二 上午11:06写道:
> > > >
> > > >> +1 (non-binding)
> > > >>
> > > >> Thanks,
> > > >> Biao /'bɪ.aʊ/
> > > >>
> > > >>
> > > >>
> > > >>> On Tue, 22 Oct 2019 at 10:26, Jark Wu 
> > > wrote:
> > > >>>
> > > >>> +1 (non-binding)
> > > >>>
> > > >>> Best,
> > > >>> Jark
> > > >>>
> > > >>> On Tue, 22 Oct 2019 at 09:38, Hequn Cheng <
> > > chenghe...@gmail.com
> > > > wrote:
> > >  +1 (non-binding)
> > > 
> > >  Best, Hequn
> > > 
> > >  On Tue, Oct 22, 2019 at 9:21 AM Dian Fu <
> > > > dian0511...@gmail.com>
> > > > wrote:
> > > > +1 (non-binding)
> > > >
> > > > Regards,
> > > > Dian
> > > >
> > > >> 在 2019年10月22日,上午9:10,Kurt Young  写道:
> > > >>
> > > >> +1 (binding)
> > > >>
> > > >> Best,
> > > >> Kurt
> > > >>
> > > >>
> > > >> On Tue, Oct 22, 2019 at 12:56 AM Fabian Hueske <
> > > > fhue...@gmail.com>
> > > > wrote:
> > > >>> +1 (binding)
> > > >>>
> > > >>> Am Mo., 21. Okt. 2019 um 16:18 Uhr schrieb Thomas
> Weise <
> > >  t...@apache.org
> > > >> :
> > >  +1 (binding)
> > > 
> > > 
> > >  On Mon, Oct 21, 2019 at 7:10 AM Timo Walther <
> > > > twal...@apache.org
> > > > wrote:
> > > > +1 (binding)
> > > >
> > > > Thanks,
> > > > Timo
> > > >
> > > >
> > > >> On 21.10.19 15:59, Till Rohrmann